new test (process) for pipe
authorLaurent Mazet <mazet@softndesign.org>
Sat, 25 Oct 2025 20:13:01 +0000 (22:13 +0200)
committerLaurent Mazet <mazet@softndesign.org>
Sat, 25 Oct 2025 20:13:01 +0000 (22:13 +0200)
pipe_lat_multi.c [new file with mode: 0644]
test_multi.c [new file with mode: 0644]
test_multi.h [new file with mode: 0644]

diff --git a/pipe_lat_multi.c b/pipe_lat_multi.c
new file mode 100644 (file)
index 0000000..4977a8e
--- /dev/null
@@ -0,0 +1,197 @@
+/* depend: */
+/* cflags: */
+/* linker: msg.o mtime.o test.o stat.o -lm -lpthread -lrt */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <mqueue.h>
+#include <semaphore.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "msg.h"
+#include "mtime.h"
+#include "test_multi.h"
+
+/* global variables */
+
+dts_t *deltas = NULL;
+int nb_measurements = 0;
+
+char *message = "Pipe latency";
+void (*usage_ext) (FILE *) = NULL;
+int (*parse_arg_ext) (char *) = NULL;
+
+#define MSGLEN 128
+#define MAXBUF 1024
+#define TIMER 1000
+
+sem_t synchro;
+#define MQCOM "/test_com"
+ts_t ts1, ts2;
+int pipefd[2] = { 0 };
+int rc = 0;
+
+int ping (void)
+{
+
+    /* open mq for communication betwen process */
+
+    mqd_t mq = mq_open (MQCOM, O_RDONLY);
+    if (mq == -1) {
+        fprintf (stderr, "ping error: mq_open\n");
+        return 1;
+    }
+
+    /* select pipe */
+
+    int fdin = pipefd[0];
+    int fdout = pipefd[1];
+
+    /* main loop */
+
+    printf ("Sending ping...\n");
+
+
+    for (int i = -1; i < nb_measurements; i++) {
+
+        char *msg = get_msg (MSGLEN);
+
+        sem_wait (&synchro);
+        usleep (TIMER);
+
+        sys_timestamp (&ts1);
+        if (write (fdout, msg, MSGLEN) == -1) {
+            fprintf (stderr, "ping error: write (%d)\n", i);
+            return 1;
+        }
+
+        if (mq_receive (mq, (char *)&ts2, sizeof (ts2), NULL) == -1) {
+            fprintf (stderr, "pong error: mq_receive (%d)\n", i);
+            return 1;
+        }
+
+        if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1);
+    }
+
+    /* close pipe */
+
+    close (fdin);
+    close (fdout);
+
+    /* close communication between process */
+
+    mq_close (mq);
+
+    return 0;
+}
+
+int pong (void)
+{
+
+    /* open mq for communication betwen process */
+
+    mqd_t mq = mq_open (MQCOM, O_WRONLY);
+    if (mq == -1) {
+        fprintf (stderr, "ping error: mq_open\n");
+        return 1;
+    }
+
+    /* select pipe */
+
+    int fdin = pipefd[0];
+    int fdout = pipefd[1];
+
+    /* main loop */
+
+    printf ("Receiving pong...\n");
+
+    for (int i = -1; i < nb_measurements; i++) {
+
+        usleep (TIMER);
+        sem_post (&synchro);
+
+        char buffer[MAXBUF] = { 0 };
+        if (read (fdin, buffer, sizeof(buffer)) == -1) {
+            fprintf (stderr, "pong error: read (%d)\n", i);
+            return 1;
+        }
+
+        sys_timestamp (&ts2);
+        if (mq_send (mq, (char *)&ts2, sizeof (ts2), 0) == -1) {
+            fprintf (stderr, "ping error: mq_send (%d)\n", i);
+            return 1;
+        }
+    }
+
+    /* close pipe */
+
+    close (fdin);
+    close (fdout);
+
+    /* close communication between process */
+
+    mq_close (mq);
+
+    return 0;
+}
+
+int init (dts_t *buffer, int nb)
+{
+
+    /* set global variables */
+
+    deltas = buffer;
+    nb_measurements = nb;
+
+    /* synchronization by semaphore */
+
+    if (sem_init (&synchro, 1, 0) != 0) {
+        fprintf (stderr, "error: sem_init\n");
+        return 1;
+    }
+
+    /* communication through mq */
+
+    struct mq_attr mq_attr = { 0 };
+    mq_attr.mq_flags = 0;
+    mq_attr.mq_maxmsg = 1;
+    mq_attr.mq_msgsize = sizeof (ts_t);
+    mq_attr.mq_curmsgs = 0;
+
+    if ((mq_unlink (MQCOM) != 0) && errno != ENOENT) {
+        fprintf (stderr, "error: mq_unlink\n");
+        return 1;
+    }
+    mqd_t mq = mq_open (MQCOM, O_CREAT | O_RDWR, S_IWUSR | S_IRUSR, &mq_attr);
+    if (mq == -1) {
+        fprintf (stderr, "error: mq_open\n");
+        return 1;
+    }
+    mq_close (mq);
+
+    /* open pipe */
+
+    if (pipe (pipefd) == -1) {
+        fprintf (stderr, "error: pipe\n");
+        return 1;
+    }
+
+    return 0;
+}
+
+int finish ()
+{
+
+    /* erase message queue */
+
+    if ((mq_unlink (MQCOM) != 0) && errno != ENOENT) {
+        fprintf (stderr, "error: mq_unlink\n");
+        return 1;
+    }
+
+    return 0;
+}
diff --git a/test_multi.c b/test_multi.c
new file mode 100644 (file)
index 0000000..4719314
--- /dev/null
@@ -0,0 +1,305 @@
+#include <assert.h>
+#include <sched.h>
+#include <semaphore.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include "stat.h"
+#include "test.h"
+
+/* constants */
+
+#define RT_PRIO 80
+
+/* static variables */
+
+char *progname = NULL;
+char *version = "1.2";
+
+double abe_per = 0;
+int delay = 0;
+int do_stat = 0;
+int excl_first = 0;
+int hist_bin = 10;
+int mode = 0;
+int nb = 1000;
+int nb_cores = 0;
+char *output = NULL;
+
+extern char *message;
+extern void (*usage_ext) (FILE *);
+extern int (*parse_arg_ext) (char *);
+extern int rc;
+
+/* usage function */
+
+int usage (int ret)
+{
+    FILE *fd = ret ? stderr : stdout;
+    fprintf (fd, "usage: %s [-a int] [-b int] [-d int] [-e int] [-h] [-k int] [-m int] [-n int] [-o file] [-s]\n", progname);
+    fprintf (fd, " -a: avoid aberrand valies (%g%%)\n", abe_per);
+    fprintf (fd, " -d: delay process start for (%ds)\n", delay);
+    fprintf (fd, " -b: nb bins for histogram (%d)\n", hist_bin);
+    fprintf (fd, " -e: exclude %d first tests\n", excl_first);
+    fprintf (fd, " -h: help message\n");
+    fprintf (fd, " -k: nb dedicated cores (%d)\n", nb_cores);
+    fprintf (fd, " -m: 0 for ping-pong, 1 ping, 2 pong (%d)\n", mode);
+    fprintf (fd, " -n: nb measurements (%d)\n", nb);
+    fprintf (fd, " -o: output raw data (%s)\n", (output) ? output : "none");
+    fprintf (fd, " -s: display statistics (%s)\n", (do_stat) ? "yes" : "no");
+    if (usage_ext) {
+        fprintf (fd, "extension:\n");
+        usage_ext (fd);
+    }
+    fprintf (fd, "%s version %s\n", progname, version);
+
+    return ret;
+}
+
+/* launch ping and pong */
+
+int launch (int target, int (*func)(void *arg))
+{
+    int cpu = (nb_cores == 1) ? 0 : (nb_cores == 2) ? target : -1;
+    if (cpu != -1) {
+        cpu_set_t cpu_mask;
+        CPU_ZERO (&cpu_mask);
+        CPU_SET (cpu, &cpu_mask);
+        if (sched_setaffinity (0, sizeof (cpu_set_t), &cpu_mask) != 0) {
+            fprintf (stderr, "error: sched_setaffinity (%d)\n", cpu);
+            return 1;
+        }
+    }
+
+    /* synchronize two process */
+
+    return func (arg);
+}
+
+/* main function */
+
+int main (int argc, char *argv[])
+{
+
+    /* get basename */
+
+    char *pt = progname = argv[0];
+    while (*pt) {
+        if ((*pt == '/') || (*pt == '\\')) {
+           progname = pt + 1;
+        }
+        pt++;
+    }
+
+    /* process arguments */
+
+    while (argc-- > 1) {
+        char *arg = *(++argv);
+        if (arg[0] != '-') {
+            if ((!parse_arg_ext) || (parse_arg_ext (arg))) {
+                fprintf (stderr, "%s: invalid option -- %s\n", progname, arg);
+                return usage (1);
+            }
+            continue;
+        }
+        char c = arg[1];
+        switch (c) {
+        case 'a':
+            arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+            if (arg == NULL) {
+                fprintf (stderr, "%s: no aberrant percent specified\n", progname);
+                return usage (1);
+            }
+            abe_per = strtod (arg, NULL);
+            break;
+        case 'b':
+            arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+            if (arg == NULL) {
+                fprintf (stderr, "%s: no number of bins specified\n", progname);
+                return usage (1);
+            }
+            hist_bin = atoi (arg);
+            break;
+        case 'd':
+            arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+            if (arg == NULL) {
+                fprintf (stderr, "%s: no delay specified\n", progname);
+                return usage (1);
+            }
+            delay = atoi (arg);
+            break;
+        case 'e':
+            arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+            if (arg == NULL) {
+                fprintf (stderr, "%s: no number of first to exclude specified\n", progname);
+                return usage (1);
+            }
+            excl_first = atoi (arg);
+            break;
+        case 'k':
+            arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+            if (arg == NULL) {
+                fprintf (stderr, "%s: no number of dedicated cores specified\n", progname);
+                return usage (1);
+            }
+            nb_cores = atoi (arg);
+            break;
+        case 'm':
+            arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+            if (arg == NULL) {
+                fprintf (stderr, "%s: no mode specified\n", progname);
+                return usage (1);
+            }
+            mode = atoi (arg);
+            break;
+        case 'n':
+            arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+            if (arg == NULL) {
+                fprintf (stderr, "%s: no number of measurements specified\n", progname);
+                return usage (1);
+            }
+            nb = atoi (arg);
+            break;
+        case 'o':
+            arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+            if (arg == NULL) {
+                fprintf (stderr, "%s: no output file specified\n", progname);
+                return usage (1);
+            }
+            output = arg;
+            break;
+        case 's':
+            do_stat = 1;
+            break;
+        case 'v':
+            printf ("version: %s\n", version);
+            break;
+        case 'h':
+        default:
+            return usage (c != 'h');
+        }
+    }
+
+    /* real-time process */
+
+    struct sched_param param = {0};
+    if (sched_getparam (0, &param) != 0) {
+        fprintf (stderr, "error: sched_getparam\n");
+        return 1;
+    }
+    param.sched_priority = RT_PRIO;
+    int rc = sched_setscheduler (0, SCHED_FIFO, &param); /* non-preemptive */
+    // int rc = sched_setscheduler (0, SCHED_RR, &param); /* preemptive */
+    if (rc != 0) {
+        fprintf (stderr, "error: sched_setscheduler\n");
+        return 1;
+    }
+
+    /* main process */
+
+    dts_t *buffer = (dts_t *) calloc (nb, sizeof (dts_t));
+    assert (buffer);
+
+    if (delay > 0) {
+        printf ("Delay start for %ds\n", delay);
+        for (int i = 0; i < 100 * delay; i++) {
+            usleep (10 * 1000);
+        }
+    }
+
+    printf ("Test: %s\n", (message) ? message : "unknown");
+    printf ("Dedicated core(s): %d\n", nb_cores);
+
+    if (init (buffer, nb)) {
+        fprintf (stderr, "error on init\n");
+        return 1;
+    }
+
+    sem_t sem_sig, sem_act;
+    if ((sem_init (&sem_sig, 1, 0) != 0) || (sem_init (&sem_sig, 1, 0) != 0)) {
+        fprintf (stderr, "error: sem_init\n");
+        return 1;
+    }
+
+    pid_t pid = (mode == 0 ) ? fork () : -2;
+    if (pid == -1) {
+        sem_wait (sem_sig);
+        sem_post (sem_act);
+        fprintf (stderr, "error: fork\n");
+        return 1;
+    } else if ((pid == 0) || (mode == 2)) {
+        return launch (1, pong);
+    }
+    if (pid > 0) {
+        sem_post (sem_sig);
+        sem_wait (sem_act);
+    }
+    int rc = launch (0, ping);
+
+    if ((pid > 0) && (kill (pid, SIGTERM) == 0)) {
+        int wstatus;
+        if (waitpid (pid, &wstatus, WUNTRACED | WCONTINUED) == -1) {
+            fprintf (stderr, "error: waitpid\n");
+        }
+    }
+
+    if (finish ()) {
+        printf ("\033[1;31mKO\033[0;0m\n");
+        return 1;
+    } else {
+        printf ("\033[1;32mOK\033[0;0m\n");
+    }
+
+    if (output) {
+        FILE *fd = fopen (output, "w");
+        assert (fd);
+        for (int i = 0; i < nb; i++) {
+            fprintf (fd, "%ld\n", buffer[i]);
+        }
+        fclose (fd);
+    }
+
+    if (do_stat) {
+
+        if (excl_first) {
+            for (int i = 0; i < nb - excl_first; i++) {
+                buffer[i] = buffer[i + excl_first];
+            }
+            nb -= excl_first;
+        }
+
+        if (abe_per != 0) {
+            int n = nb * abe_per / 100;
+            for (int j = 0; j < n / 2; j++) {
+                int imax = 0;
+                int imin = 0;
+                for (int i = 1; i < nb; i++) {
+                    if (buffer[i] > buffer[imax]) {
+                        imax = i;
+                    }
+                    if (buffer[i] < buffer[imin]) {
+                        imin = i;
+                    }
+                }
+                for (int i = 0, jump = 0; i < nb; i++) {
+                    if ((i == imax) || (i == imin)) {
+                        jump++;
+                    } else {
+                        buffer[i - jump] = buffer[i];
+                    }
+                }
+                nb -= 2;
+            }
+        }
+
+        compute_statistics (buffer, nb, hist_bin);
+    }
+
+    free (buffer);
+
+    return 0;
+}
diff --git a/test_multi.h b/test_multi.h
new file mode 100644 (file)
index 0000000..4124c30
--- /dev/null
@@ -0,0 +1,16 @@
+/* test module */
+
+#ifndef __TEST_MULTI_H__
+#define __TEST_MULTI_H__
+
+#include "mtime.h"
+
+int init (dts_t *buffer, int nb);
+
+int ping (void);
+
+int pong (void);
+
+int finish ();
+
+#endif /* __TEST_MULTI_H__ */