2 pipe tests in 1 file
authorLaurent Mazet <mazet@softndesign.org>
Wed, 29 Oct 2025 23:12:57 +0000 (00:12 +0100)
committerLaurent Mazet <mazet@softndesign.org>
Wed, 29 Oct 2025 23:12:57 +0000 (00:12 +0100)
pipe_lat.c
test.c
test.h

index c39b829123810b896135ac5d04fb6c993611cf8b..669975dfd05d6bc180e692b8fd470d1d943c816e 100644 (file)
@@ -2,11 +2,15 @@
 /* cflags: */
 /* linker: msg.o mtime.o test.o stat.o -lm -lpthread -lrt */
 
+#include <errno.h>
+#include <fcntl.h>
+#include <mqueue.h>
+#include <pthread.h>
+#include <semaphore.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
-
-#include <pthread.h>
+#include <sys/stat.h>
 #include <unistd.h>
 
 #include "msg.h"
@@ -17,6 +21,7 @@
 
 dts_t *deltas = NULL;
 int nb_measurements = 0;
+int current_mode = 0;
 
 char *message = "Pipe latency";
 void (*usage_ext) (FILE *) = NULL;
@@ -26,89 +31,180 @@ int (*parse_arg_ext) (char *) = NULL;
 #define MAXBUF 1024
 #define TIMER 1000
 
-pthread_barrier_t *barrier = NULL;
-pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;;
+sem_t _synchro;
+sem_t *synchro;
+#define SEMSYNC "/sem_sync"
+#define MQCOM "/test_com"
 ts_t ts1, ts2;
 int pipefd[2] = { 0 };
 int rc = 0;
 
+#define RETURN(v) do { rc = (v); if (current_mode == 0) pthread_exit (NULL); else return NULL; } while (0)
+
 void *ping (__attribute__((unused)) void *arg)
 {
 
-    int fdin = dup (pipefd[0]);
-    int fdout = dup (pipefd[1]);
+    /* select pipe */
+
+    int fdout = pipefd[1];
+
+    /* open mq for communication betwen process */
+    mqd_t mq = (current_mode == 1) ? mq_open (MQCOM, O_RDONLY) : -2;
+    if (mq == -1) {
+        fprintf (stderr, "ping error: mq_open\n");
+        RETURN (1);
+    }
 
     /* main loop */
 
-    pthread_barrier_wait (barrier);
     printf ("Sending ping...\n");
 
-    usleep (TIMER);
 
     for (int i = -1; i < nb_measurements; i++) {
 
         char *msg = get_msg (MSGLEN);
 
-        pthread_mutex_lock (&mutex);
-        pthread_mutex_unlock (&mutex);
-
-        usleep (TIMER / 2);
+        sem_wait (synchro);
+        usleep (TIMER);
 
         sys_timestamp (&ts1);
         if (write (fdout, msg, MSGLEN) == -1) {
             fprintf (stderr, "ping error: write (%d)\n", i);
-            rc = 1;
-            pthread_exit (NULL);
+            RETURN (1);
+        }
+
+        switch (current_mode) {
+        case 0:
+            /* done by thread pong */
+            break;
+        case 1:
+            if (mq_receive (mq, (char *)&ts2, sizeof (ts2), NULL) == -1) {
+                fprintf (stderr, "pong error: mq_receive (%d)\n", i);
+                RETURN (1);
+            }
+            if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1);
+            break;
+        case 2:
+            /* to do */
+            break;
+        case 3:
+            /* can't append */
+            break;
         }
-        usleep (TIMER);
     }
 
-    close (fdin);
-    close (fdout);
+    /* close communication between process */
 
-    pthread_exit (NULL);
+    if (current_mode == 1) {
+        mq_close (mq);
+    }
+
+    RETURN (0);
 }
 
 void *pong (__attribute__((unused)) void *arg)
 {
 
-    int fdin = dup (pipefd[0]);
-    int fdout = dup (pipefd[1]);
+    /* select pipe */
+
+    int fdin = pipefd[0];
+
+    /* open mq for communication betwen process */
+    mqd_t mq = (current_mode == 1) ? mq_open (MQCOM, O_WRONLY) : -2;
+    if (mq == -1) {
+        fprintf (stderr, "pong error: mq_open\n");
+        RETURN (1);
+    }
+
+    /* main loop */
 
-    pthread_barrier_wait (barrier);
     printf ("Receiving pong...\n");
 
     for (int i = -1; i < nb_measurements; i++) {
 
-        pthread_mutex_lock (&mutex);
         usleep (TIMER);
-        pthread_mutex_unlock (&mutex);
+        sem_post (synchro);
 
         char buffer[MAXBUF] = { 0 };
         if (read (fdin, buffer, sizeof(buffer)) == -1) {
             fprintf (stderr, "pong error: read (%d)\n", i);
-            rc = 1;
-            pthread_exit (NULL);
+            RETURN (1);
         }
-
         sys_timestamp (&ts2);
-        if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1);
+
+        switch (current_mode) {
+        case 0:
+            if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1);
+            break;
+        case 1:
+            if (mq_send (mq, (char *)&ts2, sizeof (ts2), 0) == -1) {
+                fprintf (stderr, "ping error: mq_send (%d)\n", i);
+                RETURN (1);
+            }
+            break;
+        case 2:
+            /* can't append */
+            break;
+        case 3:
+            /* to do */
+            break;
+        }
     }
 
-    close (fdin);
-    close (fdout);
+    /* close communication between process */
 
-    pthread_exit (NULL);
+    if (current_mode == 1) {
+        mq_close (mq);
+    }
+
+    RETURN (0);
 }
 
-int init (dts_t *buffer, int nb, pthread_barrier_t *synchro)
+int init (dts_t *buffer, int nb, int mode)
 {
 
     /* set global variables */
 
     deltas = buffer;
     nb_measurements = nb;
-    barrier = synchro;
+    current_mode = mode;
+
+    /* synchronization by semaphore */
+
+    if (current_mode == 0) {
+        synchro = &_synchro;
+        if (sem_init (synchro, (current_mode > 0), 0) != 0) {
+            fprintf (stderr, "error: sem_init\n");
+            return 1;
+        }
+    } else {
+        synchro = sem_open (SEMSYNC, O_CREAT, S_IRUSR|S_IWUSR, 0);
+        if (synchro == NULL) {
+            fprintf (stderr, "error: sem_open\n");
+            return 1;
+        }
+    }
+
+    /* communication through mq */
+
+    if (current_mode == 1) {
+        struct mq_attr mq_attr = { 0 };
+        mq_attr.mq_flags = 0;
+        mq_attr.mq_maxmsg = 1;
+        mq_attr.mq_msgsize = sizeof (ts_t);
+        mq_attr.mq_curmsgs = 0;
+
+        if ((mq_unlink (MQCOM) != 0) && errno != ENOENT) {
+            fprintf (stderr, "error: mq_unlink\n");
+            return 1;
+        }
+        mqd_t mq = mq_open (MQCOM, O_CREAT | O_RDWR, S_IWUSR | S_IRUSR, &mq_attr);
+        if (mq == -1) {
+            fprintf (stderr, "error: mq_open\n");
+            return 1;
+        }
+        mq_close (mq);
+    }
 
     /* open pipe */
 
@@ -123,8 +219,19 @@ int init (dts_t *buffer, int nb, pthread_barrier_t *synchro)
 int finish ()
 {
 
+    /* close pipe */
+
     close (pipefd[0]);
     close (pipefd[1]);
 
+    /* close semaphore */
+
+    if (current_mode) {
+        sem_destroy (synchro);
+    } else {
+        sem_close (synchro);
+        sem_unlink (SEMSYNC);
+    }
+
     return rc;
 }
diff --git a/test.c b/test.c
index 2ca47b5358027eae238a2fc67f8eda1ab825554f..e82d5a14c651c660ec885ad6013a2606649c0733 100644 (file)
--- a/test.c
+++ b/test.c
@@ -1,10 +1,15 @@
 #define _GNU_SOURCE
 #include <assert.h>
+#include <fcntl.h>
 #include <pthread.h>
 #include <sched.h>
+#include <semaphore.h>
+#include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
 #include <unistd.h>
 
 #include "stat.h"
@@ -24,6 +29,7 @@ int delay = 0;
 int do_stat = 0;
 int excl_first = 0;
 int hist_bin = 10;
+int mode = 0;
 int nb = 1000;
 int nb_cores = 0;
 char *output = NULL;
@@ -38,13 +44,14 @@ extern int rc;
 int usage (int ret)
 {
     FILE *fd = ret ? stderr : stdout;
-    fprintf (fd, "usage: %s [-a int] [-b int] [-d int] [-e int] [-h] [-k int] [-n int] [-o file] [-s]\n", progname);
+    fprintf (fd, "usage: %s [-a int] [-b int] [-d int] [-e int] [-h] [-k int] [-m int] [-n int] [-o file] [-s]\n", progname);
     fprintf (fd, " -a: avoid aberrand valies (%g%%)\n", abe_per);
     fprintf (fd, " -d: delay process start for (%ds)\n", delay);
     fprintf (fd, " -b: nb bins for histogram (%d)\n", hist_bin);
     fprintf (fd, " -e: exclude %d first tests\n", excl_first);
     fprintf (fd, " -h: help message\n");
     fprintf (fd, " -k: nb dedicated cores (%d)\n", nb_cores);
+    fprintf (fd, " -m: 0 for threads, 1 ping-pong process, 2 ping process, 3 pong process (%d)\n", mode);
     fprintf (fd, " -n: nb measurements (%d)\n", nb);
     fprintf (fd, " -o: output raw data (%s)\n", (output) ? output : "none");
     fprintf (fd, " -s: display statistics (%s)\n", (do_stat) ? "yes" : "no");
@@ -58,7 +65,14 @@ int usage (int ret)
 }
 
 /* launch ping and pong */
+
+#define SEMSIG "/sem_sig"
+#define SEMACT "/sem_act"
+sem_t _sem_sig, _sem_act;
+sem_t *sem_sig, *sem_act;
+
 typedef struct {
+    int synchro;
     int target;
     void *(*func)(void *arg);
 } launch_param_t;
@@ -74,7 +88,19 @@ void *launch (void *arg)
         if (sched_setaffinity (0, sizeof (cpu_set_t), &cpu_mask) != 0) {
             fprintf (stderr, "error: sched_setaffinity (%d)\n", cpu);
             rc = 1;
-            pthread_exit (NULL);
+            if (mode == 0) pthread_exit (NULL); else return NULL;
+        }
+    }
+    if (param->synchro) {
+        switch (param->target) {
+        case 0:
+            sem_wait (sem_sig);
+            sem_post (sem_act);
+            break;
+        case 1:
+            sem_post (sem_sig);
+            sem_wait (sem_act);
+            break;
         }
     }
     return param->func (arg);
@@ -148,6 +174,14 @@ int main (int argc, char *argv[])
             }
             nb_cores = atoi (arg);
             break;
+        case 'm':
+            arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+            if (arg == NULL) {
+                fprintf (stderr, "%s: no mode specified\n", progname);
+                return usage (1);
+            }
+            mode = atoi (arg);
+            break;
         case 'n':
             arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
             if (arg == NULL) {
@@ -206,37 +240,79 @@ int main (int argc, char *argv[])
     printf ("Test: %s\n", (message) ? message : "unknown");
     printf ("Dedicated core(s): %d\n", nb_cores);
 
-    pthread_barrier_t synchro;
-    pthread_barrier_init (&synchro, NULL, 2);
-
-    if (init (buffer, nb, &synchro)) {
+    if (init (buffer, nb, mode)) {
         fprintf (stderr, "error on init\n");
         return 1;
     }
 
-    pthread_t tid1;
-    launch_param_t lparam_t1 = { 0 };
-    lparam_t1.target = 0;
-    lparam_t1.func = ping;
-    if (pthread_create (&tid1, NULL, launch, &lparam_t1) != 0) {
-        fprintf (stderr, "error on pthread_create\n");
-        return 1;
-    }
-
-    pthread_t tid2;
-    launch_param_t lparam_t2 = { 0 };
-    lparam_t2.target = 1;
-    lparam_t2.func = pong;
-    if (pthread_create (&tid2, NULL, launch, &lparam_t2) != 0) {
-        fprintf (stderr, "error on pthread_create\n");
-        return 1;
+    launch_param_t sync_ping = { 1, 0, ping };
+    launch_param_t sync_pong = { 1, 1, pong };
+    launch_param_t async_ping = { 0, 0, ping };
+    launch_param_t async_pong = { 0, 0, pong };
+    switch (mode){
+    case 0:
+        printf ("thread mode\n");
+        if ((sem_init (sem_sig = &_sem_sig, 0, 0) != 0) || (sem_init (sem_act = &_sem_act, 0, 0) != 0)) {
+            fprintf (stderr, "error: sem_init\n");
+            return 1;
+        } else {
+            pthread_t tid1, tid2;
+            if (pthread_create (&tid1, NULL, launch, &sync_ping) != 0) {
+                fprintf (stderr, "error on pthread_create\n");
+                return 1;
+            }
+            if (pthread_create (&tid2, NULL, launch, &sync_pong) != 0) {
+                fprintf (stderr, "error on pthread_create\n");
+                return 1;
+            }
+            pthread_join (tid1, NULL);
+            pthread_join (tid2, NULL);
+            sem_destroy (sem_act);
+            sem_destroy (sem_sig);
+        }
+        break;
+    case 1:
+        printf ("process mode\n");
+        sem_sig = sem_open (SEMSIG, O_CREAT, S_IRUSR|S_IWUSR, 0);
+        sem_act = sem_open (SEMACT, O_CREAT, S_IRUSR|S_IWUSR, 0);
+        if ((sem_sig == NULL) || (sem_act == NULL)) {
+            fprintf (stderr, "error: sem_open\n");
+            return 1;
+        } else {
+            pid_t pid = fork ();
+            if (pid == -1) {
+                fprintf (stderr, "error: fork\n");
+                return 1;
+            } else if (pid > 0) {
+                launch (&sync_ping);
+            } else {
+                launch (&sync_pong);
+            }
+            sem_close (sem_act);
+            sem_unlink (SEMACT);
+            sem_close (sem_sig);
+            sem_unlink (SEMSIG);
+            if (pid == 0) {
+                return finish ();
+            }
+            if (kill (pid, SIGTERM) == 0) {
+                int wstatus;
+                if (waitpid (pid, &wstatus, WUNTRACED | WCONTINUED) == -1) {
+                    fprintf (stderr, "error: waitpid\n");
+                }
+            }
+        }
+        break;
+    case 2:
+        printf ("ping mode\n");
+        launch (&async_ping);
+        break;
+    case 3:
+        printf ("pong mode\n");
+        launch (&async_pong);
+        return finish ();
     }
 
-    pthread_join (tid1, NULL);
-    pthread_join (tid2, NULL);
-
-    pthread_barrier_destroy (&synchro);
-
     if (finish ()) {
         printf ("\033[1;31mKO\033[0;0m\n");
         return 1;
diff --git a/test.h b/test.h
index f34e2eb129ebda1aae34119294a3d9e59aa62cd4..914d82dcc90bb41d12c8fe6472f39dc4eb061e9c 100644 (file)
--- a/test.h
+++ b/test.h
@@ -7,7 +7,7 @@
 
 #include "mtime.h"
 
-int init (dts_t *buffer, int nb, pthread_barrier_t *synchro);
+int init (dts_t *buffer, int nb, int mode);
 
 void *ping (void *arg);