From 5facdc983d04b341468f98d69ae75b8f948bb3b8 Mon Sep 17 00:00:00 2001 From: Laurent Mazet Date: Sat, 25 Oct 2025 22:13:01 +0200 Subject: [PATCH] new test (process) for pipe --- pipe_lat_multi.c | 197 ++++++++++++++++++++++++++++++ test_multi.c | 305 +++++++++++++++++++++++++++++++++++++++++++++++ test_multi.h | 16 +++ 3 files changed, 518 insertions(+) create mode 100644 pipe_lat_multi.c create mode 100644 test_multi.c create mode 100644 test_multi.h diff --git a/pipe_lat_multi.c b/pipe_lat_multi.c new file mode 100644 index 0000000..4977a8e --- /dev/null +++ b/pipe_lat_multi.c @@ -0,0 +1,197 @@ +/* depend: */ +/* cflags: */ +/* linker: msg.o mtime.o test.o stat.o -lm -lpthread -lrt */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "msg.h" +#include "mtime.h" +#include "test_multi.h" + +/* global variables */ + +dts_t *deltas = NULL; +int nb_measurements = 0; + +char *message = "Pipe latency"; +void (*usage_ext) (FILE *) = NULL; +int (*parse_arg_ext) (char *) = NULL; + +#define MSGLEN 128 +#define MAXBUF 1024 +#define TIMER 1000 + +sem_t synchro; +#define MQCOM "/test_com" +ts_t ts1, ts2; +int pipefd[2] = { 0 }; +int rc = 0; + +int ping (void) +{ + + /* open mq for communication betwen process */ + + mqd_t mq = mq_open (MQCOM, O_RDONLY); + if (mq == -1) { + fprintf (stderr, "ping error: mq_open\n"); + return 1; + } + + /* select pipe */ + + int fdin = pipefd[0]; + int fdout = pipefd[1]; + + /* main loop */ + + printf ("Sending ping...\n"); + + + for (int i = -1; i < nb_measurements; i++) { + + char *msg = get_msg (MSGLEN); + + sem_wait (&synchro); + usleep (TIMER); + + sys_timestamp (&ts1); + if (write (fdout, msg, MSGLEN) == -1) { + fprintf (stderr, "ping error: write (%d)\n", i); + return 1; + } + + if (mq_receive (mq, (char *)&ts2, sizeof (ts2), NULL) == -1) { + fprintf (stderr, "pong error: mq_receive (%d)\n", i); + return 1; + } + + if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1); + } + + /* close pipe */ + + close (fdin); + close (fdout); + + /* close communication between process */ + + mq_close (mq); + + return 0; +} + +int pong (void) +{ + + /* open mq for communication betwen process */ + + mqd_t mq = mq_open (MQCOM, O_WRONLY); + if (mq == -1) { + fprintf (stderr, "ping error: mq_open\n"); + return 1; + } + + /* select pipe */ + + int fdin = pipefd[0]; + int fdout = pipefd[1]; + + /* main loop */ + + printf ("Receiving pong...\n"); + + for (int i = -1; i < nb_measurements; i++) { + + usleep (TIMER); + sem_post (&synchro); + + char buffer[MAXBUF] = { 0 }; + if (read (fdin, buffer, sizeof(buffer)) == -1) { + fprintf (stderr, "pong error: read (%d)\n", i); + return 1; + } + + sys_timestamp (&ts2); + if (mq_send (mq, (char *)&ts2, sizeof (ts2), 0) == -1) { + fprintf (stderr, "ping error: mq_send (%d)\n", i); + return 1; + } + } + + /* close pipe */ + + close (fdin); + close (fdout); + + /* close communication between process */ + + mq_close (mq); + + return 0; +} + +int init (dts_t *buffer, int nb) +{ + + /* set global variables */ + + deltas = buffer; + nb_measurements = nb; + + /* synchronization by semaphore */ + + if (sem_init (&synchro, 1, 0) != 0) { + fprintf (stderr, "error: sem_init\n"); + return 1; + } + + /* communication through mq */ + + struct mq_attr mq_attr = { 0 }; + mq_attr.mq_flags = 0; + mq_attr.mq_maxmsg = 1; + mq_attr.mq_msgsize = sizeof (ts_t); + mq_attr.mq_curmsgs = 0; + + if ((mq_unlink (MQCOM) != 0) && errno != ENOENT) { + fprintf (stderr, "error: mq_unlink\n"); + return 1; + } + mqd_t mq = mq_open (MQCOM, O_CREAT | O_RDWR, S_IWUSR | S_IRUSR, &mq_attr); + if (mq == -1) { + fprintf (stderr, "error: mq_open\n"); + return 1; + } + mq_close (mq); + + /* open pipe */ + + if (pipe (pipefd) == -1) { + fprintf (stderr, "error: pipe\n"); + return 1; + } + + return 0; +} + +int finish () +{ + + /* erase message queue */ + + if ((mq_unlink (MQCOM) != 0) && errno != ENOENT) { + fprintf (stderr, "error: mq_unlink\n"); + return 1; + } + + return 0; +} diff --git a/test_multi.c b/test_multi.c new file mode 100644 index 0000000..4719314 --- /dev/null +++ b/test_multi.c @@ -0,0 +1,305 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "stat.h" +#include "test.h" + +/* constants */ + +#define RT_PRIO 80 + +/* static variables */ + +char *progname = NULL; +char *version = "1.2"; + +double abe_per = 0; +int delay = 0; +int do_stat = 0; +int excl_first = 0; +int hist_bin = 10; +int mode = 0; +int nb = 1000; +int nb_cores = 0; +char *output = NULL; + +extern char *message; +extern void (*usage_ext) (FILE *); +extern int (*parse_arg_ext) (char *); +extern int rc; + +/* usage function */ + +int usage (int ret) +{ + FILE *fd = ret ? stderr : stdout; + fprintf (fd, "usage: %s [-a int] [-b int] [-d int] [-e int] [-h] [-k int] [-m int] [-n int] [-o file] [-s]\n", progname); + fprintf (fd, " -a: avoid aberrand valies (%g%%)\n", abe_per); + fprintf (fd, " -d: delay process start for (%ds)\n", delay); + fprintf (fd, " -b: nb bins for histogram (%d)\n", hist_bin); + fprintf (fd, " -e: exclude %d first tests\n", excl_first); + fprintf (fd, " -h: help message\n"); + fprintf (fd, " -k: nb dedicated cores (%d)\n", nb_cores); + fprintf (fd, " -m: 0 for ping-pong, 1 ping, 2 pong (%d)\n", mode); + fprintf (fd, " -n: nb measurements (%d)\n", nb); + fprintf (fd, " -o: output raw data (%s)\n", (output) ? output : "none"); + fprintf (fd, " -s: display statistics (%s)\n", (do_stat) ? "yes" : "no"); + if (usage_ext) { + fprintf (fd, "extension:\n"); + usage_ext (fd); + } + fprintf (fd, "%s version %s\n", progname, version); + + return ret; +} + +/* launch ping and pong */ + +int launch (int target, int (*func)(void *arg)) +{ + int cpu = (nb_cores == 1) ? 0 : (nb_cores == 2) ? target : -1; + if (cpu != -1) { + cpu_set_t cpu_mask; + CPU_ZERO (&cpu_mask); + CPU_SET (cpu, &cpu_mask); + if (sched_setaffinity (0, sizeof (cpu_set_t), &cpu_mask) != 0) { + fprintf (stderr, "error: sched_setaffinity (%d)\n", cpu); + return 1; + } + } + + /* synchronize two process */ + + return func (arg); +} + +/* main function */ + +int main (int argc, char *argv[]) +{ + + /* get basename */ + + char *pt = progname = argv[0]; + while (*pt) { + if ((*pt == '/') || (*pt == '\\')) { + progname = pt + 1; + } + pt++; + } + + /* process arguments */ + + while (argc-- > 1) { + char *arg = *(++argv); + if (arg[0] != '-') { + if ((!parse_arg_ext) || (parse_arg_ext (arg))) { + fprintf (stderr, "%s: invalid option -- %s\n", progname, arg); + return usage (1); + } + continue; + } + char c = arg[1]; + switch (c) { + case 'a': + arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; + if (arg == NULL) { + fprintf (stderr, "%s: no aberrant percent specified\n", progname); + return usage (1); + } + abe_per = strtod (arg, NULL); + break; + case 'b': + arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; + if (arg == NULL) { + fprintf (stderr, "%s: no number of bins specified\n", progname); + return usage (1); + } + hist_bin = atoi (arg); + break; + case 'd': + arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; + if (arg == NULL) { + fprintf (stderr, "%s: no delay specified\n", progname); + return usage (1); + } + delay = atoi (arg); + break; + case 'e': + arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; + if (arg == NULL) { + fprintf (stderr, "%s: no number of first to exclude specified\n", progname); + return usage (1); + } + excl_first = atoi (arg); + break; + case 'k': + arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; + if (arg == NULL) { + fprintf (stderr, "%s: no number of dedicated cores specified\n", progname); + return usage (1); + } + nb_cores = atoi (arg); + break; + case 'm': + arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; + if (arg == NULL) { + fprintf (stderr, "%s: no mode specified\n", progname); + return usage (1); + } + mode = atoi (arg); + break; + case 'n': + arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; + if (arg == NULL) { + fprintf (stderr, "%s: no number of measurements specified\n", progname); + return usage (1); + } + nb = atoi (arg); + break; + case 'o': + arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL; + if (arg == NULL) { + fprintf (stderr, "%s: no output file specified\n", progname); + return usage (1); + } + output = arg; + break; + case 's': + do_stat = 1; + break; + case 'v': + printf ("version: %s\n", version); + break; + case 'h': + default: + return usage (c != 'h'); + } + } + + /* real-time process */ + + struct sched_param param = {0}; + if (sched_getparam (0, ¶m) != 0) { + fprintf (stderr, "error: sched_getparam\n"); + return 1; + } + param.sched_priority = RT_PRIO; + int rc = sched_setscheduler (0, SCHED_FIFO, ¶m); /* non-preemptive */ + // int rc = sched_setscheduler (0, SCHED_RR, ¶m); /* preemptive */ + if (rc != 0) { + fprintf (stderr, "error: sched_setscheduler\n"); + return 1; + } + + /* main process */ + + dts_t *buffer = (dts_t *) calloc (nb, sizeof (dts_t)); + assert (buffer); + + if (delay > 0) { + printf ("Delay start for %ds\n", delay); + for (int i = 0; i < 100 * delay; i++) { + usleep (10 * 1000); + } + } + + printf ("Test: %s\n", (message) ? message : "unknown"); + printf ("Dedicated core(s): %d\n", nb_cores); + + if (init (buffer, nb)) { + fprintf (stderr, "error on init\n"); + return 1; + } + + sem_t sem_sig, sem_act; + if ((sem_init (&sem_sig, 1, 0) != 0) || (sem_init (&sem_sig, 1, 0) != 0)) { + fprintf (stderr, "error: sem_init\n"); + return 1; + } + + pid_t pid = (mode == 0 ) ? fork () : -2; + if (pid == -1) { + sem_wait (sem_sig); + sem_post (sem_act); + fprintf (stderr, "error: fork\n"); + return 1; + } else if ((pid == 0) || (mode == 2)) { + return launch (1, pong); + } + if (pid > 0) { + sem_post (sem_sig); + sem_wait (sem_act); + } + int rc = launch (0, ping); + + if ((pid > 0) && (kill (pid, SIGTERM) == 0)) { + int wstatus; + if (waitpid (pid, &wstatus, WUNTRACED | WCONTINUED) == -1) { + fprintf (stderr, "error: waitpid\n"); + } + } + + if (finish ()) { + printf ("\033[1;31mKO\033[0;0m\n"); + return 1; + } else { + printf ("\033[1;32mOK\033[0;0m\n"); + } + + if (output) { + FILE *fd = fopen (output, "w"); + assert (fd); + for (int i = 0; i < nb; i++) { + fprintf (fd, "%ld\n", buffer[i]); + } + fclose (fd); + } + + if (do_stat) { + + if (excl_first) { + for (int i = 0; i < nb - excl_first; i++) { + buffer[i] = buffer[i + excl_first]; + } + nb -= excl_first; + } + + if (abe_per != 0) { + int n = nb * abe_per / 100; + for (int j = 0; j < n / 2; j++) { + int imax = 0; + int imin = 0; + for (int i = 1; i < nb; i++) { + if (buffer[i] > buffer[imax]) { + imax = i; + } + if (buffer[i] < buffer[imin]) { + imin = i; + } + } + for (int i = 0, jump = 0; i < nb; i++) { + if ((i == imax) || (i == imin)) { + jump++; + } else { + buffer[i - jump] = buffer[i]; + } + } + nb -= 2; + } + } + + compute_statistics (buffer, nb, hist_bin); + } + + free (buffer); + + return 0; +} diff --git a/test_multi.h b/test_multi.h new file mode 100644 index 0000000..4124c30 --- /dev/null +++ b/test_multi.h @@ -0,0 +1,16 @@ +/* test module */ + +#ifndef __TEST_MULTI_H__ +#define __TEST_MULTI_H__ + +#include "mtime.h" + +int init (dts_t *buffer, int nb); + +int ping (void); + +int pong (void); + +int finish (); + +#endif /* __TEST_MULTI_H__ */ -- 2.30.2