#include <unistd.h>
#include "core.h"
-#include "rdtsc.h"
#include "verbose.h"
#include "task.h"
-long long tsc_start;
-
-void task_stat (task_t * t, int condensed)
-{
- if (!t->is_sync) return;
-
- pthread_mutex_lock(&t->stats_mutex);
-
- if (condensed) {
- VERBOSE (coretools, INFO, PRINTF ("%s=[%lu]", t->name, (unsigned long) tsc2ms (t->current_exec_time)));
- } else {
- VERBOSE (coretools, INFO, PRINTF ("task %s prio=%d cpu=%s last run in %lu µs [count=%d overtimes=%lu overloads=%lu, min=%lu average=%lu max=%lu]\n",
- t->name, t->priority, t->cpu_list,
- (unsigned long) tsc2ms (t->current_exec_time), t->count, t->overtimes,
- t->overloads, (unsigned long) tsc2ms (t->min_exec_time),
- t->count ? (unsigned long) tsc2ms (t->cumul_exec_time / t->count) : 0,
- (unsigned long) tsc2ms (t->max_exec_time)));
- }
-
- pthread_mutex_unlock(&t->stats_mutex);
-}
-
-void *task_sync_runner (void *param)
-{
- sub_task_t *s = (sub_task_t *) param;
- task_t *t = s->task;
-
- cpu_setaffinity(t->cpu_list);
-
- if (t->priority) {
- enable_realtime (t->priority);
- }
- pthread_mutex_init (&s->condition_mutex, NULL);
- pthread_cond_init (&s->condition_cond, NULL);
-
- /* potential init function */
- if (t->init) t->init (s, s->sub_task_id);
- pthread_mutex_unlock (&t->init_mutex);
-
- while (1) {
- // Wait for the trigger
- pthread_mutex_lock (&s->condition_mutex);
- while (s->trigger == 0) {
- pthread_cond_wait (&s->condition_cond, &s->condition_mutex);
- }
- pthread_mutex_unlock (&s->condition_mutex);
-
- // Run the task and measure the execution time
-
- long long tsc1, tsc2;
-
- tsc1 = rdtsc ();
- t->task (s, s->sub_task_id);
- tsc2 = rdtsc ();
-
- // Update performances indicators
-
- pthread_mutex_lock (&t->stats_mutex);
-
- t->current_exec_time = tsc2diff (tsc2, tsc1);
- t->cumul_exec_time += t->current_exec_time;
- t->count++;
- s->count++;
-
- if (s->start_time_us) *(s->start_time_us) = tsc2ms(tsc2diff (tsc1, tsc_start));
- if (s->exec_time_us) *(s->exec_time_us) = tsc2ms(t->current_exec_time);
-
- if (t->current_exec_time > t->max_exec_time)
- t->max_exec_time = t->current_exec_time;
- if (t->current_exec_time / 10 > t->min_exec_time)
- t->min_exec_time = 0; // To avoid min=0
- if ((t->current_exec_time < t->min_exec_time) || (t->min_exec_time == 0))
- t->min_exec_time = t->current_exec_time;
-
- if (t->current_exec_time > t->max_allowed_time) {
- VERBOSE (coretools, WARNING, PRINTF ("task %s[%d] too slow %luus\n", t->name, s->sub_task_id, (unsigned long) tsc2ms (t->current_exec_time)));
- t->overtimes++;
- }
-
- if (s->count > 10 && t->current_exec_time > 10*t->max_allowed_time) {
- char ascii_time[80];
- time_t my_time = time(NULL);
- strftime(ascii_time, sizeof(ascii_time), "%T", localtime(&my_time));
- VERBOSE (coretools, ERROR, printf ("%s: task %s[%d], big overtime %luus\n", ascii_time, t->name, s->sub_task_id, (unsigned long) tsc2ms (t->current_exec_time)));
- }
-
- pthread_mutex_unlock (&t->stats_mutex);
-
- pthread_mutex_lock (&s->condition_mutex);
- s->trigger = 0;
- pthread_cond_signal (&s->condition_cond);
- pthread_mutex_unlock (&s->condition_mutex);
- }
-
- return NULL;
-}
-
-int task_trig (task_t * t)
-{
- assert (t != NULL);
- int i;
- for (i=0; i<t->nb_sub_tasks; i++) {
- sub_task_t *s = &t->sub_task_list[i];
- pthread_mutex_lock (&s->condition_mutex);
- while (s->trigger != 0) {
- pthread_cond_wait (&s->condition_cond, &s->condition_mutex);
- }
- s->trigger = 1;
- pthread_cond_signal (&s->condition_cond);
- pthread_mutex_unlock (&s->condition_mutex);
- }
- return 0;
-}
-
-int task_end (task_t * t)
-{
- assert (t != NULL);
- int i;
- for (i=0; i<t->nb_sub_tasks; i++) {
- sub_task_t *s = &t->sub_task_list[i];
- pthread_mutex_lock (&s->condition_mutex);
- while (s->trigger != 0) {
- pthread_cond_wait (&s->condition_cond, &s->condition_mutex);
- }
- pthread_mutex_unlock (&s->condition_mutex);
- }
- return 0;
-}
-
-int task_selective_run (task_t * t, int run_list[])
-{
- assert (t != NULL);
- int i;
-
- // Start all selected sub-tasks
-
- for (i=0; i<t->nb_sub_tasks; i++) {
- if (!run_list[i]) continue;
- sub_task_t *s = &t->sub_task_list[i];
- pthread_mutex_lock (&s->condition_mutex);
- while (s->trigger != 0) {
- pthread_cond_wait (&s->condition_cond, &s->condition_mutex);
- }
- s->trigger = 1;
- pthread_cond_signal (&s->condition_cond);
- pthread_mutex_unlock (&s->condition_mutex);
-
- }
-
- // Wait end of all selected sub-tasks
-
- for (i=0; i<t->nb_sub_tasks; i++) {
- if (!run_list[i]) continue;
- sub_task_t *s = &t->sub_task_list[i];
- pthread_mutex_lock (&s->condition_mutex);
- while (s->trigger != 0) {
- pthread_cond_wait (&s->condition_cond, &s->condition_mutex);
- }
- pthread_mutex_unlock (&s->condition_mutex);
- }
-
- return 0;
-}
-
-int task_try_trig (task_t * t)
-{
- assert (t != NULL);
-
- int i;
- int nb_running = 0;
- for (i=0; i<t->nb_sub_tasks; i++) nb_running += t->sub_task_list[i].trigger ? 1 : 0;
-
- if (nb_running != 0) {
- t->overloads++;
- return -1;
- }
-
- task_trig(t);
-
- return 0;
-}
-
-task_t *create_sync_task (const char *name, int (*task) (sub_task_t *, int),
- float max_allowed_time_us, int (*init) (sub_task_t *, int),
- int priority, int nb_sub_tasks, const char *cpu_list)
-{
- int i;
-
- task_t *t = calloc (1, sizeof (*t));
- assert (t);
-
- t->name = strdup (name);
- t->is_sync = 1;
- t->task = task;
- t->max_allowed_time = (long long) (max_allowed_time_us / tsc2ms (1));
- t->init = init;
- t->priority = priority;
- if (cpu_list) {
- t->cpu_list = strdup(cpu_list);
- for (i = 0; t->cpu_list[i] != 0; i++) {
- if (t->cpu_list[i] == ':') {
- t->cpu_list[i] = 0;
- int nb = atoi (t->cpu_list + i + 1);
- if (nb > 0)
- nb_sub_tasks = nb;
- }
- }
- }
- pthread_mutex_init (&t->stats_mutex, NULL);
- pthread_mutex_init (&t->init_mutex, NULL);
-
- t->sub_task_list = calloc (nb_sub_tasks, sizeof (sub_task_t));
- assert (t->sub_task_list);
- t->nb_sub_tasks = nb_sub_tasks;
-
- VERBOSE (coretools, INFO, PRINTF ("Creating task %s[%d] prio=%d cpu=%s ttl=%d\n", t->name, t->nb_sub_tasks, t->priority, t->cpu_list, (int)(max_allowed_time_us)));
-
- pthread_attr_t tattr;
- int ret = 0;
-
- size_t size = 1024 * 1024;
-
- /* initialized with default attributes */
- ret |= pthread_attr_init (&tattr);
-
- /* setting the size of the stack also */
- ret |= pthread_attr_setstacksize (&tattr, size);
-
- for (i=0; i<nb_sub_tasks; i++) {
- t->sub_task_list[i].task = t;
- t->sub_task_list[i].sub_task_id = i;
- pthread_mutex_lock (&t->init_mutex);
- pthread_create (&t->sub_task_list[i].thread, &tattr, task_sync_runner, &t->sub_task_list[i]);
- }
-
- return t;
-}
-
void *task_async_runner (void *param)
{
sub_task_t *s = (sub_task_t *) param;
if (t->priority) {
enable_realtime (t->priority);
}
- pthread_mutex_init (&s->condition_mutex, NULL);
- pthread_cond_init (&s->condition_cond, NULL);
while (!t->task (s, s->sub_task_id)) { }
assert (t);
t->name = strdup (name);
- t->is_sync = 0;
t->task = task;
t->priority = priority;
if (cpu_list) {
typedef struct {
pthread_t thread;
- pthread_mutex_t condition_mutex;
- pthread_cond_t condition_cond;
- int trigger;
struct _task_t *task;
int sub_task_id;
-
- int count;
-
- unsigned long *start_time_us;
- unsigned long *exec_time_us;
} sub_task_t;
typedef struct _task_t {
char *name;
- int is_sync;
int priority;
char *cpu_list;
sub_task_t *sub_task_list;
int nb_sub_tasks;
-
- // Init function
-
- int (*init) (sub_task_t *s, int subtask_id);
- pthread_mutex_t init_mutex;
-
- // Statistics section
-
- pthread_mutex_t stats_mutex;
-
- int count;
- long long min_exec_time;
- long long max_exec_time;
- long long current_exec_time;
- unsigned long long cumul_exec_time;
- unsigned long int overtimes;
- unsigned long int overloads;
} task_t;
-task_t *create_sync_task (const char *name, int (*task) (sub_task_t *, int),
- float max_allowed_time_us, int (*init) (sub_task_t *, int),
- int priority, int nb_sub_tasks, const char *cpu_list);
-
task_t *create_async_task (const char *name, int (*task) (sub_task_t *, int),
int priority, int nb_sub_tasks, const char *cpu_list);
-int task_trig (task_t * t);
-
-int task_try_trig (task_t * t);
-
-int task_end (task_t * t);
-
-int task_selective_run (task_t * t, int run_list[]);
-
-void task_stat (task_t * t, int condensed);
-
void kill_all_subtasks (task_t *t, int sig);
#endif /* __TASK_H__ */