From 5b8361077f4c60cfcf8594496e72b7f6b15d6225 Mon Sep 17 00:00:00 2001 From: Laurent Mazet Date: Thu, 30 Oct 2025 00:12:57 +0100 Subject: [PATCH] 2 pipe tests in 1 file --- pipe_lat.c | 171 +++++++++++++++++++++++++++++++++++++++++++---------- test.c | 130 +++++++++++++++++++++++++++++++--------- test.h | 2 +- 3 files changed, 243 insertions(+), 60 deletions(-) diff --git a/pipe_lat.c b/pipe_lat.c index c39b829..669975d 100644 --- a/pipe_lat.c +++ b/pipe_lat.c @@ -2,11 +2,15 @@ /* cflags: */ /* linker: msg.o mtime.o test.o stat.o -lm -lpthread -lrt */ +#include +#include +#include +#include +#include #include #include #include - -#include +#include #include #include "msg.h" @@ -17,6 +21,7 @@ dts_t *deltas = NULL; int nb_measurements = 0; +int current_mode = 0; char *message = "Pipe latency"; void (*usage_ext) (FILE *) = NULL; @@ -26,89 +31,180 @@ int (*parse_arg_ext) (char *) = NULL; #define MAXBUF 1024 #define TIMER 1000 -pthread_barrier_t *barrier = NULL; -pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;; +sem_t _synchro; +sem_t *synchro; +#define SEMSYNC "/sem_sync" +#define MQCOM "/test_com" ts_t ts1, ts2; int pipefd[2] = { 0 }; int rc = 0; +#define RETURN(v) do { rc = (v); if (current_mode == 0) pthread_exit (NULL); else return NULL; } while (0) + void *ping (__attribute__((unused)) void *arg) { - int fdin = dup (pipefd[0]); - int fdout = dup (pipefd[1]); + /* select pipe */ + + int fdout = pipefd[1]; + + /* open mq for communication betwen process */ + mqd_t mq = (current_mode == 1) ? mq_open (MQCOM, O_RDONLY) : -2; + if (mq == -1) { + fprintf (stderr, "ping error: mq_open\n"); + RETURN (1); + } /* main loop */ - pthread_barrier_wait (barrier); printf ("Sending ping...\n"); - usleep (TIMER); for (int i = -1; i < nb_measurements; i++) { char *msg = get_msg (MSGLEN); - pthread_mutex_lock (&mutex); - pthread_mutex_unlock (&mutex); - - usleep (TIMER / 2); + sem_wait (synchro); + usleep (TIMER); sys_timestamp (&ts1); if (write (fdout, msg, MSGLEN) == -1) { fprintf (stderr, "ping error: write (%d)\n", i); - rc = 1; - pthread_exit (NULL); + RETURN (1); + } + + switch (current_mode) { + case 0: + /* done by thread pong */ + break; + case 1: + if (mq_receive (mq, (char *)&ts2, sizeof (ts2), NULL) == -1) { + fprintf (stderr, "pong error: mq_receive (%d)\n", i); + RETURN (1); + } + if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1); + break; + case 2: + /* to do */ + break; + case 3: + /* can't append */ + break; } - usleep (TIMER); } - close (fdin); - close (fdout); + /* close communication between process */ - pthread_exit (NULL); + if (current_mode == 1) { + mq_close (mq); + } + + RETURN (0); } void *pong (__attribute__((unused)) void *arg) { - int fdin = dup (pipefd[0]); - int fdout = dup (pipefd[1]); + /* select pipe */ + + int fdin = pipefd[0]; + + /* open mq for communication betwen process */ + mqd_t mq = (current_mode == 1) ? mq_open (MQCOM, O_WRONLY) : -2; + if (mq == -1) { + fprintf (stderr, "pong error: mq_open\n"); + RETURN (1); + } + + /* main loop */ - pthread_barrier_wait (barrier); printf ("Receiving pong...\n"); for (int i = -1; i < nb_measurements; i++) { - pthread_mutex_lock (&mutex); usleep (TIMER); - pthread_mutex_unlock (&mutex); + sem_post (synchro); char buffer[MAXBUF] = { 0 }; if (read (fdin, buffer, sizeof(buffer)) == -1) { fprintf (stderr, "pong error: read (%d)\n", i); - rc = 1; - pthread_exit (NULL); + RETURN (1); } - sys_timestamp (&ts2); - if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1); + + switch (current_mode) { + case 0: + if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1); + break; + case 1: + if (mq_send (mq, (char *)&ts2, sizeof (ts2), 0) == -1) { + fprintf (stderr, "ping error: mq_send (%d)\n", i); + RETURN (1); + } + break; + case 2: + /* can't append */ + break; + case 3: + /* to do */ + break; + } } - close (fdin); - close (fdout); + /* close communication between process */ - pthread_exit (NULL); + if (current_mode == 1) { + mq_close (mq); + } + + RETURN (0); } -int init (dts_t *buffer, int nb, pthread_barrier_t *synchro) +int init (dts_t *buffer, int nb, int mode) { /* set global variables */ deltas = buffer; nb_measurements = nb; - barrier = synchro; + current_mode = mode; + + /* synchronization by semaphore */ + + if (current_mode == 0) { + synchro = &_synchro; + if (sem_init (synchro, (current_mode > 0), 0) != 0) { + fprintf (stderr, "error: sem_init\n"); + return 1; + } + } else { + synchro = sem_open (SEMSYNC, O_CREAT, S_IRUSR|S_IWUSR, 0); + if (synchro == NULL) { + fprintf (stderr, "error: sem_open\n"); + return 1; + } + } + + /* communication through mq */ + + if (current_mode == 1) { + struct mq_attr mq_attr = { 0 }; + mq_attr.mq_flags = 0; + mq_attr.mq_maxmsg = 1; + mq_attr.mq_msgsize = sizeof (ts_t); + mq_attr.mq_curmsgs = 0; + + if ((mq_unlink (MQCOM) != 0) && errno != ENOENT) { + fprintf (stderr, "error: mq_unlink\n"); + return 1; + } + mqd_t mq = mq_open (MQCOM, O_CREAT | O_RDWR, S_IWUSR | S_IRUSR, &mq_attr); + if (mq == -1) { + fprintf (stderr, "error: mq_open\n"); + return 1; + } + mq_close (mq); + } /* open pipe */ @@ -123,8 +219,19 @@ int init (dts_t *buffer, int nb, pthread_barrier_t *synchro) int finish () { + /* close pipe */ + close (pipefd[0]); close (pipefd[1]); + /* close semaphore */ + + if (current_mode) { + sem_destroy (synchro); + } else { + sem_close (synchro); + sem_unlink (SEMSYNC); + } + return rc; } diff --git a/test.c b/test.c index 2ca47b5..e82d5a1 100644 --- a/test.c +++ b/test.c @@ -1,10 +1,15 @@ #define _GNU_SOURCE #include +#include #include #include +#include +#include #include #include #include +#include +#include #include #include "stat.h" @@ -24,6 +29,7 @@ int delay = 0; int do_stat = 0; int excl_first = 0; int hist_bin = 10; +int mode = 0; int nb = 1000; int nb_cores = 0; char *output = NULL; @@ -38,13 +44,14 @@ extern int rc; int usage (int ret) { FILE *fd = ret ? stderr : stdout; - fprintf (fd, "usage: %s [-a int] [-b int] [-d int] [-e int] [-h] [-k int] [-n int] [-o file] [-s]\n", progname); + fprintf (fd, "usage: %s [-a int] [-b int] [-d int] [-e int] [-h] [-k int] [-m int] [-n int] [-o file] [-s]\n", progname); fprintf (fd, " -a: avoid aberrand valies (%g%%)\n", abe_per); fprintf (fd, " -d: delay process start for (%ds)\n", delay); fprintf (fd, " -b: nb bins for histogram (%d)\n", hist_bin); fprintf (fd, " -e: exclude %d first tests\n", excl_first); fprintf (fd, " -h: help message\n"); fprintf (fd, " -k: nb dedicated cores (%d)\n", nb_cores); + fprintf (fd, " -m: 0 for threads, 1 ping-pong process, 2 ping process, 3 pong process (%d)\n", mode); fprintf (fd, " -n: nb measurements (%d)\n", nb); fprintf (fd, " -o: output raw data (%s)\n", (output) ? output : "none"); fprintf (fd, " -s: display statistics (%s)\n", (do_stat) ? "yes" : "no"); @@ -58,7 +65,14 @@ int usage (int ret) } /* launch ping and pong */ + +#define SEMSIG "/sem_sig" +#define SEMACT "/sem_act" +sem_t _sem_sig, _sem_act; +sem_t *sem_sig, *sem_act; + typedef struct { + int synchro; int target; void *(*func)(void *arg); } launch_param_t; @@ -74,7 +88,19 @@ void *launch (void *arg) if (sched_setaffinity (0, sizeof (cpu_set_t), &cpu_mask) != 0) { fprintf (stderr, "error: sched_setaffinity (%d)\n", cpu); rc = 1; - pthread_exit (NULL); + if (mode == 0) pthread_exit (NULL); else return NULL; + } + } + if (param->synchro) { + switch (param->target) { + case 0: + sem_wait (sem_sig); + sem_post (sem_act); + break; + case 1: + sem_post (sem_sig); + sem_wait (sem_act); + break; } } return param->func (arg); @@ -148,6 +174,14 @@ int main (int argc, char *argv[]) } nb_cores = atoi (arg); break; + case 'm': + arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; + if (arg == NULL) { + fprintf (stderr, "%s: no mode specified\n", progname); + return usage (1); + } + mode = atoi (arg); + break; case 'n': arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; if (arg == NULL) { @@ -206,37 +240,79 @@ int main (int argc, char *argv[]) printf ("Test: %s\n", (message) ? message : "unknown"); printf ("Dedicated core(s): %d\n", nb_cores); - pthread_barrier_t synchro; - pthread_barrier_init (&synchro, NULL, 2); - - if (init (buffer, nb, &synchro)) { + if (init (buffer, nb, mode)) { fprintf (stderr, "error on init\n"); return 1; } - pthread_t tid1; - launch_param_t lparam_t1 = { 0 }; - lparam_t1.target = 0; - lparam_t1.func = ping; - if (pthread_create (&tid1, NULL, launch, &lparam_t1) != 0) { - fprintf (stderr, "error on pthread_create\n"); - return 1; - } - - pthread_t tid2; - launch_param_t lparam_t2 = { 0 }; - lparam_t2.target = 1; - lparam_t2.func = pong; - if (pthread_create (&tid2, NULL, launch, &lparam_t2) != 0) { - fprintf (stderr, "error on pthread_create\n"); - return 1; + launch_param_t sync_ping = { 1, 0, ping }; + launch_param_t sync_pong = { 1, 1, pong }; + launch_param_t async_ping = { 0, 0, ping }; + launch_param_t async_pong = { 0, 0, pong }; + switch (mode){ + case 0: + printf ("thread mode\n"); + if ((sem_init (sem_sig = &_sem_sig, 0, 0) != 0) || (sem_init (sem_act = &_sem_act, 0, 0) != 0)) { + fprintf (stderr, "error: sem_init\n"); + return 1; + } else { + pthread_t tid1, tid2; + if (pthread_create (&tid1, NULL, launch, &sync_ping) != 0) { + fprintf (stderr, "error on pthread_create\n"); + return 1; + } + if (pthread_create (&tid2, NULL, launch, &sync_pong) != 0) { + fprintf (stderr, "error on pthread_create\n"); + return 1; + } + pthread_join (tid1, NULL); + pthread_join (tid2, NULL); + sem_destroy (sem_act); + sem_destroy (sem_sig); + } + break; + case 1: + printf ("process mode\n"); + sem_sig = sem_open (SEMSIG, O_CREAT, S_IRUSR|S_IWUSR, 0); + sem_act = sem_open (SEMACT, O_CREAT, S_IRUSR|S_IWUSR, 0); + if ((sem_sig == NULL) || (sem_act == NULL)) { + fprintf (stderr, "error: sem_open\n"); + return 1; + } else { + pid_t pid = fork (); + if (pid == -1) { + fprintf (stderr, "error: fork\n"); + return 1; + } else if (pid > 0) { + launch (&sync_ping); + } else { + launch (&sync_pong); + } + sem_close (sem_act); + sem_unlink (SEMACT); + sem_close (sem_sig); + sem_unlink (SEMSIG); + if (pid == 0) { + return finish (); + } + if (kill (pid, SIGTERM) == 0) { + int wstatus; + if (waitpid (pid, &wstatus, WUNTRACED | WCONTINUED) == -1) { + fprintf (stderr, "error: waitpid\n"); + } + } + } + break; + case 2: + printf ("ping mode\n"); + launch (&async_ping); + break; + case 3: + printf ("pong mode\n"); + launch (&async_pong); + return finish (); } - pthread_join (tid1, NULL); - pthread_join (tid2, NULL); - - pthread_barrier_destroy (&synchro); - if (finish ()) { printf ("\033[1;31mKO\033[0;0m\n"); return 1; diff --git a/test.h b/test.h index f34e2eb..914d82d 100644 --- a/test.h +++ b/test.h @@ -7,7 +7,7 @@ #include "mtime.h" -int init (dts_t *buffer, int nb, pthread_barrier_t *synchro); +int init (dts_t *buffer, int nb, int mode); void *ping (void *arg); -- 2.30.2