/* cflags: */
/* linker: msg.o mtime.o test.o stat.o -lm -lpthread -lrt */
+#include <errno.h>
+#include <fcntl.h>
+#include <mqueue.h>
+#include <pthread.h>
+#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-
-#include <pthread.h>
+#include <sys/stat.h>
#include <unistd.h>
#include "msg.h"
dts_t *deltas = NULL;
int nb_measurements = 0;
+int current_mode = 0;
char *message = "Pipe latency";
void (*usage_ext) (FILE *) = NULL;
#define MAXBUF 1024
#define TIMER 1000
-pthread_barrier_t *barrier = NULL;
-pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;;
+sem_t _synchro;
+sem_t *synchro;
+#define SEMSYNC "/sem_sync"
+#define MQCOM "/test_com"
ts_t ts1, ts2;
int pipefd[2] = { 0 };
int rc = 0;
+#define RETURN(v) do { rc = (v); if (current_mode == 0) pthread_exit (NULL); else return NULL; } while (0)
+
void *ping (__attribute__((unused)) void *arg)
{
- int fdin = dup (pipefd[0]);
- int fdout = dup (pipefd[1]);
+ /* select pipe */
+
+ int fdout = pipefd[1];
+
+ /* open mq for communication betwen process */
+ mqd_t mq = (current_mode == 1) ? mq_open (MQCOM, O_RDONLY) : -2;
+ if (mq == -1) {
+ fprintf (stderr, "ping error: mq_open\n");
+ RETURN (1);
+ }
/* main loop */
- pthread_barrier_wait (barrier);
printf ("Sending ping...\n");
- usleep (TIMER);
for (int i = -1; i < nb_measurements; i++) {
char *msg = get_msg (MSGLEN);
- pthread_mutex_lock (&mutex);
- pthread_mutex_unlock (&mutex);
-
- usleep (TIMER / 2);
+ sem_wait (synchro);
+ usleep (TIMER);
sys_timestamp (&ts1);
if (write (fdout, msg, MSGLEN) == -1) {
fprintf (stderr, "ping error: write (%d)\n", i);
- rc = 1;
- pthread_exit (NULL);
+ RETURN (1);
+ }
+
+ switch (current_mode) {
+ case 0:
+ /* done by thread pong */
+ break;
+ case 1:
+ if (mq_receive (mq, (char *)&ts2, sizeof (ts2), NULL) == -1) {
+ fprintf (stderr, "pong error: mq_receive (%d)\n", i);
+ RETURN (1);
+ }
+ if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1);
+ break;
+ case 2:
+ /* to do */
+ break;
+ case 3:
+ /* can't append */
+ break;
}
- usleep (TIMER);
}
- close (fdin);
- close (fdout);
+ /* close communication between process */
- pthread_exit (NULL);
+ if (current_mode == 1) {
+ mq_close (mq);
+ }
+
+ RETURN (0);
}
void *pong (__attribute__((unused)) void *arg)
{
- int fdin = dup (pipefd[0]);
- int fdout = dup (pipefd[1]);
+ /* select pipe */
+
+ int fdin = pipefd[0];
+
+ /* open mq for communication betwen process */
+ mqd_t mq = (current_mode == 1) ? mq_open (MQCOM, O_WRONLY) : -2;
+ if (mq == -1) {
+ fprintf (stderr, "pong error: mq_open\n");
+ RETURN (1);
+ }
+
+ /* main loop */
- pthread_barrier_wait (barrier);
printf ("Receiving pong...\n");
for (int i = -1; i < nb_measurements; i++) {
- pthread_mutex_lock (&mutex);
usleep (TIMER);
- pthread_mutex_unlock (&mutex);
+ sem_post (synchro);
char buffer[MAXBUF] = { 0 };
if (read (fdin, buffer, sizeof(buffer)) == -1) {
fprintf (stderr, "pong error: read (%d)\n", i);
- rc = 1;
- pthread_exit (NULL);
+ RETURN (1);
}
-
sys_timestamp (&ts2);
- if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1);
+
+ switch (current_mode) {
+ case 0:
+ if (i != -1) deltas[i] = diff_timestamp (&ts2, &ts1);
+ break;
+ case 1:
+ if (mq_send (mq, (char *)&ts2, sizeof (ts2), 0) == -1) {
+ fprintf (stderr, "ping error: mq_send (%d)\n", i);
+ RETURN (1);
+ }
+ break;
+ case 2:
+ /* can't append */
+ break;
+ case 3:
+ /* to do */
+ break;
+ }
}
- close (fdin);
- close (fdout);
+ /* close communication between process */
- pthread_exit (NULL);
+ if (current_mode == 1) {
+ mq_close (mq);
+ }
+
+ RETURN (0);
}
-int init (dts_t *buffer, int nb, pthread_barrier_t *synchro)
+int init (dts_t *buffer, int nb, int mode)
{
/* set global variables */
deltas = buffer;
nb_measurements = nb;
- barrier = synchro;
+ current_mode = mode;
+
+ /* synchronization by semaphore */
+
+ if (current_mode == 0) {
+ synchro = &_synchro;
+ if (sem_init (synchro, (current_mode > 0), 0) != 0) {
+ fprintf (stderr, "error: sem_init\n");
+ return 1;
+ }
+ } else {
+ synchro = sem_open (SEMSYNC, O_CREAT, S_IRUSR|S_IWUSR, 0);
+ if (synchro == NULL) {
+ fprintf (stderr, "error: sem_open\n");
+ return 1;
+ }
+ }
+
+ /* communication through mq */
+
+ if (current_mode == 1) {
+ struct mq_attr mq_attr = { 0 };
+ mq_attr.mq_flags = 0;
+ mq_attr.mq_maxmsg = 1;
+ mq_attr.mq_msgsize = sizeof (ts_t);
+ mq_attr.mq_curmsgs = 0;
+
+ if ((mq_unlink (MQCOM) != 0) && errno != ENOENT) {
+ fprintf (stderr, "error: mq_unlink\n");
+ return 1;
+ }
+ mqd_t mq = mq_open (MQCOM, O_CREAT | O_RDWR, S_IWUSR | S_IRUSR, &mq_attr);
+ if (mq == -1) {
+ fprintf (stderr, "error: mq_open\n");
+ return 1;
+ }
+ mq_close (mq);
+ }
/* open pipe */
int finish ()
{
+ /* close pipe */
+
close (pipefd[0]);
close (pipefd[1]);
+ /* close semaphore */
+
+ if (current_mode) {
+ sem_destroy (synchro);
+ } else {
+ sem_close (synchro);
+ sem_unlink (SEMSYNC);
+ }
+
return rc;
}
#define _GNU_SOURCE
#include <assert.h>
+#include <fcntl.h>
#include <pthread.h>
#include <sched.h>
+#include <semaphore.h>
+#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
#include <unistd.h>
#include "stat.h"
int do_stat = 0;
int excl_first = 0;
int hist_bin = 10;
+int mode = 0;
int nb = 1000;
int nb_cores = 0;
char *output = NULL;
int usage (int ret)
{
FILE *fd = ret ? stderr : stdout;
- fprintf (fd, "usage: %s [-a int] [-b int] [-d int] [-e int] [-h] [-k int] [-n int] [-o file] [-s]\n", progname);
+ fprintf (fd, "usage: %s [-a int] [-b int] [-d int] [-e int] [-h] [-k int] [-m int] [-n int] [-o file] [-s]\n", progname);
fprintf (fd, " -a: avoid aberrand valies (%g%%)\n", abe_per);
fprintf (fd, " -d: delay process start for (%ds)\n", delay);
fprintf (fd, " -b: nb bins for histogram (%d)\n", hist_bin);
fprintf (fd, " -e: exclude %d first tests\n", excl_first);
fprintf (fd, " -h: help message\n");
fprintf (fd, " -k: nb dedicated cores (%d)\n", nb_cores);
+ fprintf (fd, " -m: 0 for threads, 1 ping-pong process, 2 ping process, 3 pong process (%d)\n", mode);
fprintf (fd, " -n: nb measurements (%d)\n", nb);
fprintf (fd, " -o: output raw data (%s)\n", (output) ? output : "none");
fprintf (fd, " -s: display statistics (%s)\n", (do_stat) ? "yes" : "no");
}
/* launch ping and pong */
+
+#define SEMSIG "/sem_sig"
+#define SEMACT "/sem_act"
+sem_t _sem_sig, _sem_act;
+sem_t *sem_sig, *sem_act;
+
typedef struct {
+ int synchro;
int target;
void *(*func)(void *arg);
} launch_param_t;
if (sched_setaffinity (0, sizeof (cpu_set_t), &cpu_mask) != 0) {
fprintf (stderr, "error: sched_setaffinity (%d)\n", cpu);
rc = 1;
- pthread_exit (NULL);
+ if (mode == 0) pthread_exit (NULL); else return NULL;
+ }
+ }
+ if (param->synchro) {
+ switch (param->target) {
+ case 0:
+ sem_wait (sem_sig);
+ sem_post (sem_act);
+ break;
+ case 1:
+ sem_post (sem_sig);
+ sem_wait (sem_act);
+ break;
}
}
return param->func (arg);
}
nb_cores = atoi (arg);
break;
+ case 'm':
+ arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
+ if (arg == NULL) {
+ fprintf (stderr, "%s: no mode specified\n", progname);
+ return usage (1);
+ }
+ mode = atoi (arg);
+ break;
case 'n':
arg = (arg[2]) ? arg + 2 : (--argc > 0) ? *(++argv) : NULL;
if (arg == NULL) {
printf ("Test: %s\n", (message) ? message : "unknown");
printf ("Dedicated core(s): %d\n", nb_cores);
- pthread_barrier_t synchro;
- pthread_barrier_init (&synchro, NULL, 2);
-
- if (init (buffer, nb, &synchro)) {
+ if (init (buffer, nb, mode)) {
fprintf (stderr, "error on init\n");
return 1;
}
- pthread_t tid1;
- launch_param_t lparam_t1 = { 0 };
- lparam_t1.target = 0;
- lparam_t1.func = ping;
- if (pthread_create (&tid1, NULL, launch, &lparam_t1) != 0) {
- fprintf (stderr, "error on pthread_create\n");
- return 1;
- }
-
- pthread_t tid2;
- launch_param_t lparam_t2 = { 0 };
- lparam_t2.target = 1;
- lparam_t2.func = pong;
- if (pthread_create (&tid2, NULL, launch, &lparam_t2) != 0) {
- fprintf (stderr, "error on pthread_create\n");
- return 1;
+ launch_param_t sync_ping = { 1, 0, ping };
+ launch_param_t sync_pong = { 1, 1, pong };
+ launch_param_t async_ping = { 0, 0, ping };
+ launch_param_t async_pong = { 0, 0, pong };
+ switch (mode){
+ case 0:
+ printf ("thread mode\n");
+ if ((sem_init (sem_sig = &_sem_sig, 0, 0) != 0) || (sem_init (sem_act = &_sem_act, 0, 0) != 0)) {
+ fprintf (stderr, "error: sem_init\n");
+ return 1;
+ } else {
+ pthread_t tid1, tid2;
+ if (pthread_create (&tid1, NULL, launch, &sync_ping) != 0) {
+ fprintf (stderr, "error on pthread_create\n");
+ return 1;
+ }
+ if (pthread_create (&tid2, NULL, launch, &sync_pong) != 0) {
+ fprintf (stderr, "error on pthread_create\n");
+ return 1;
+ }
+ pthread_join (tid1, NULL);
+ pthread_join (tid2, NULL);
+ sem_destroy (sem_act);
+ sem_destroy (sem_sig);
+ }
+ break;
+ case 1:
+ printf ("process mode\n");
+ sem_sig = sem_open (SEMSIG, O_CREAT, S_IRUSR|S_IWUSR, 0);
+ sem_act = sem_open (SEMACT, O_CREAT, S_IRUSR|S_IWUSR, 0);
+ if ((sem_sig == NULL) || (sem_act == NULL)) {
+ fprintf (stderr, "error: sem_open\n");
+ return 1;
+ } else {
+ pid_t pid = fork ();
+ if (pid == -1) {
+ fprintf (stderr, "error: fork\n");
+ return 1;
+ } else if (pid > 0) {
+ launch (&sync_ping);
+ } else {
+ launch (&sync_pong);
+ }
+ sem_close (sem_act);
+ sem_unlink (SEMACT);
+ sem_close (sem_sig);
+ sem_unlink (SEMSIG);
+ if (pid == 0) {
+ return finish ();
+ }
+ if (kill (pid, SIGTERM) == 0) {
+ int wstatus;
+ if (waitpid (pid, &wstatus, WUNTRACED | WCONTINUED) == -1) {
+ fprintf (stderr, "error: waitpid\n");
+ }
+ }
+ }
+ break;
+ case 2:
+ printf ("ping mode\n");
+ launch (&async_ping);
+ break;
+ case 3:
+ printf ("pong mode\n");
+ launch (&async_pong);
+ return finish ();
}
- pthread_join (tid1, NULL);
- pthread_join (tid2, NULL);
-
- pthread_barrier_destroy (&synchro);
-
if (finish ()) {
printf ("\033[1;31mKO\033[0;0m\n");
return 1;