00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "defs.h"
00022 #include "uem.h"
00023 #include <sys/msg.h>
00024 #include <sys/sem.h>
00025 #ifdef POSIX_IPC
00026 #include <mqueue.h>
00027 #endif
00028
00029 static pthread_mutex_t uem_mtx = PTHREAD_MUTEX_INITIALIZER;
00030 static pthread_cond_t uem_cond_event = PTHREAD_COND_INITIALIZER;
00031 static struct uem_event *event_head;
00032
00033 static struct uem_reg *new_reg(void)
00034 {
00035 struct uem_reg *p;
00036
00037 ec_null( p = calloc(1, sizeof(struct uem_reg)) )
00038 return p;
00039
00040 EC_CLEANUP_BGN
00041 return NULL;
00042 EC_CLEANUP_END
00043 }
00044
00045
00046
00047 static bool queue_event(struct uem_event *e)
00048 {
00049 struct uem_event *cur;
00050
00051 ec_rv( pthread_mutex_lock(&uem_mtx) )
00052 if (event_head == NULL)
00053 event_head = e;
00054 else {
00055 for (cur = event_head; cur->ue_next != NULL; cur = cur->ue_next)
00056
00057 if (e->ue_errno != 0 &&
00058 cur->ue_reg->ur_type == e->ue_reg->ur_type &&
00059 cur->ue_errno == e->ue_errno) {
00060 ec_rv( pthread_mutex_unlock(&uem_mtx) )
00061 uem_free(e);
00062 return true;
00063 }
00064 cur->ue_next = e;
00065 }
00066 ec_rv( pthread_cond_signal(&uem_cond_event) )
00067 ec_rv( pthread_mutex_unlock(&uem_mtx) )
00068 return true;
00069
00070 EC_CLEANUP_BGN
00071 (void)pthread_mutex_unlock(&uem_mtx);
00072 return false;
00073 EC_CLEANUP_END
00074 }
00075
00076 static bool dequeue_event(struct uem_reg *p)
00077 {
00078 struct uem_event *cur, *prev, *next;
00079
00080 ec_rv( pthread_mutex_lock(&uem_mtx) )
00081 for (cur = event_head; cur != NULL; cur = next) {
00082 next = cur->ue_next;
00083 if (cur->ue_reg == p) {
00084 if (prev == NULL)
00085 event_head = next;
00086 else
00087 prev->ue_next = next;
00088 uem_free(cur);
00089 }
00090 else
00091 prev = cur;
00092 }
00093 ec_rv( pthread_mutex_unlock(&uem_mtx) )
00094 return true;
00095
00096 EC_CLEANUP_BGN
00097 (void)pthread_mutex_unlock(&uem_mtx);
00098 return false;
00099 EC_CLEANUP_END
00100 }
00101
00102 static void cleanup_handler(void *arg)
00103 {
00104 (void)uem_free((struct uem_event *)arg);
00105 }
00106
00107
00108 static void free_svmsg(struct uem_event *e)
00109 {
00110 free(e->ue_buf);
00111 }
00112
00113
00114 static void *thread_svmsg(void *arg)
00115 {
00116 struct uem_event *e = NULL;
00117
00118 pthread_cleanup_push(cleanup_handler, e);
00119 while (true) {
00120 ec_null( e = calloc(1, sizeof(struct uem_event)) )
00121 e->ue_reg = (struct uem_reg *)arg;
00122 ec_null( e->ue_buf = malloc(e->ue_reg->ur_size) )
00123 if ((e->ue_result = msgrcv(e->ue_reg->ur_resource.ur_mqid,
00124 e->ue_buf, e->ue_reg->ur_size - sizeof(long), 0, 0)) == -1)
00125 e->ue_errno = errno;
00126 ec_false( queue_event(e) )
00127 }
00128 pthread_cleanup_pop(false);
00129 return NULL;
00130
00131 EC_CLEANUP_BGN
00132 uem_free(e);
00133 EC_FLUSH("thread_svmsg")
00134 return NULL;
00135 EC_CLEANUP_END
00136 }
00137
00138 bool uem_register_svmsg(int mqid, size_t msgsize, void *data)
00139 {
00140 struct uem_reg *p;
00141
00142 ec_null( p = new_reg() )
00143 p->ur_type = UEM_SVMSG;
00144 p->ur_resource.ur_mqid = mqid;
00145 p->ur_size = msgsize;
00146 p->ur_data = data;
00147 ec_rv( pthread_create(&p->ur_tid, NULL, thread_svmsg, p) )
00148 return true;
00149
00150 EC_CLEANUP_BGN
00151 return false;
00152 EC_CLEANUP_END
00153 }
00154
00155 static void free_svsem_reg(struct uem_reg *p)
00156 {
00157 }
00158
00159 static void *thread_svsem(void *arg)
00160 {
00161 struct uem_event *e = NULL;
00162
00163 pthread_cleanup_push(cleanup_handler, e);
00164 while (true) {
00165 ec_null( e = calloc(1, sizeof(struct uem_event)) )
00166 e->ue_reg = (struct uem_reg *)arg;
00167 if ((e->ue_result = semop(e->ue_reg->ur_resource.ur_svsem.s_semid,
00168 e->ue_reg->ur_resource.ur_svsem.s_sops, e->ue_reg->ur_size)) ==
00169 -1)
00170 e->ue_errno = errno;
00171 ec_false( queue_event(e) )
00172 }
00173 pthread_cleanup_pop(false);
00174 return NULL;
00175
00176 EC_CLEANUP_BGN
00177 uem_free(e);
00178 EC_FLUSH("thread_svsem")
00179 return NULL;
00180 EC_CLEANUP_END
00181 }
00182
00183 bool uem_register_svsem(int semid, struct sembuf *sops, size_t nsops,
00184 void *data)
00185 {
00186 struct uem_reg *p;
00187
00188 ec_null( p = new_reg() )
00189 p->ur_type = UEM_SVSEM;
00190 p->ur_resource.ur_svsem.s_semid = semid;
00191 ec_null( p->ur_resource.ur_svsem.s_sops =
00192 calloc(1, sizeof(struct sembuf) * nsops) )
00193 memcpy(p->ur_resource.ur_svsem.s_sops, sops,
00194 sizeof(struct sembuf) * nsops);
00195 p->ur_size = nsops;
00196 p->ur_data = data;
00197 ec_rv( pthread_create(&p->ur_tid, NULL, thread_svsem, p) )
00198 return true;
00199
00200 EC_CLEANUP_BGN
00201 return false;
00202 EC_CLEANUP_END
00203 }
00204
00205 #ifdef POSIX_IPC
00206 static void *thread_pxmsg(void *arg)
00207 {
00208 struct uem_event *e = NULL;
00209 struct mq_attr attr;
00210
00211 pthread_cleanup_push(cleanup_handler, e);
00212 while (true) {
00213 ec_null( e = calloc(1, sizeof(struct uem_event)) )
00214 e->ue_reg = (struct uem_reg *)arg;
00215 ec_neg1( mq_getattr(e->ue_reg->ur_resource.ur_mqd, &attr) )
00216 ec_null( e->ue_buf = malloc(attr.mq_msgsize) )
00217 if ((e->ue_result = mq_receive(e->ue_reg->ur_resource.ur_mqd,
00218 e->ue_buf, attr.mq_msgsize, NULL)) == -1)
00219 e->ue_errno = errno;
00220 ec_false( queue_event(e) )
00221 }
00222 pthread_cleanup_pop(false);
00223 return NULL;
00224
00225 EC_CLEANUP_BGN
00226 uem_free(e);
00227 EC_FLUSH("thread_pxmsg")
00228 return NULL;
00229 EC_CLEANUP_END
00230 }
00231
00232 bool uem_register_pxmsg(mqd_t mqd, void *data)
00233 {
00234 struct uem_reg *p;
00235
00236 ec_null( p = new_reg() )
00237 p->ur_type = UEM_PXMSG;
00238 p->ur_resource.ur_mqd = mqd;
00239 p->ur_data = data;
00240 ec_rv( pthread_create(&p->ur_tid, NULL, thread_pxmsg, p) )
00241 return true;
00242
00243 EC_CLEANUP_BGN
00244 return false;
00245 EC_CLEANUP_END
00246 }
00247
00248 static void *thread_pxsem(void *arg)
00249 {
00250 struct uem_event *e = NULL;
00251
00252 pthread_cleanup_push(cleanup_handler, e);
00253 while (true) {
00254 ec_null( e = calloc(1, sizeof(struct uem_event)) )
00255 e->ue_reg = (struct uem_reg *)arg;
00256 if ((e->ue_result = sem_wait(e->ue_reg->ur_resource.ur_sem)) == -1)
00257 e->ue_errno = errno;
00258 ec_false( queue_event(e) )
00259 }
00260 pthread_cleanup_pop(false);
00261 return NULL;
00262
00263 EC_CLEANUP_BGN
00264 uem_free(e);
00265 EC_FLUSH("thread_pxsem")
00266 return NULL;
00267 EC_CLEANUP_END
00268 }
00269
00270 bool uem_register_pxsem(sem_t *sem, void *data)
00271 {
00272 struct uem_reg *p;
00273
00274 ec_null( p = new_reg() )
00275 p->ur_type = UEM_PXSEM;
00276 p->ur_resource.ur_sem = sem;
00277 p->ur_data = data;
00278 ec_rv( pthread_create(&p->ur_tid, NULL, thread_pxsem, p) )
00279 return true;
00280
00281 EC_CLEANUP_BGN
00282 return false;
00283 EC_CLEANUP_END
00284 }
00285 #endif
00286
00287
00288
00289
00290
00291
00292
00293
00294 static void *thread_fdset(void *arg)
00295 {
00296 struct uem_event *e = NULL;
00297 struct uem_reg *p = (struct uem_reg *)arg;
00298 fd_set fdset, *fdset_read = NULL, *fdset_write = NULL, *fdset_error = NULL;
00299 int i;
00300
00301 pthread_cleanup_push(cleanup_handler, e);
00302 while (true) {
00303
00304
00305
00306 ec_null( e = calloc(1, sizeof(struct uem_event)) )
00307 e->ue_reg = p;
00308 fdset = p->ur_resource.ur_fdset;
00309 switch(p->ur_type) {
00310 case UEM_FD_READ:
00311 fdset_read = &fdset;
00312 break;
00313 case UEM_FD_WRITE:
00314 fdset_write = &fdset;
00315 break;
00316 case UEM_FD_ERROR:
00317 fdset_error = &fdset;
00318 break;
00319 default:
00320 errno = EINVAL;
00321 EC_FAIL
00322 }
00323 if (select(p->ur_size, fdset_read, fdset_write, fdset_error, NULL)
00324 == -1) {
00325 e->ue_errno = errno;
00326 ec_false( queue_event(e) )
00327 }
00328 else {
00329 for (i = 0; i < p->ur_size; i++)
00330 if (FD_ISSET(i, &fdset)) {
00331 struct uem_event *cur;
00332
00333 ec_rv( pthread_mutex_lock(&uem_mtx) )
00334 for (cur = event_head; cur != NULL; cur = cur->ue_next)
00335 if (cur->ue_reg->ur_type == p->ur_type &&
00336 cur->ue_result == i)
00337 break;
00338 ec_rv( pthread_mutex_unlock(&uem_mtx) )
00339 if (cur != NULL)
00340 continue;
00341 if (e == NULL) {
00342 ec_null( e = calloc(1, sizeof(struct uem_event)) )
00343 e->ue_reg = p;
00344 }
00345 e->ue_result = i;
00346 ec_false( queue_event(e) )
00347 e = NULL;
00348 }
00349 }
00350 }
00351 pthread_cleanup_pop(false);
00352 return NULL;
00353
00354 EC_CLEANUP_BGN
00355 (void)pthread_mutex_unlock(&uem_mtx);
00356 uem_free(e);
00357 EC_FLUSH("thread_fdset")
00358 return NULL;
00359 EC_CLEANUP_END
00360 }
00361
00362 bool uem_register_fdset(int nfds, fd_set *fdset, enum UEM_TYPE type,
00363 void *data)
00364 {
00365 struct uem_reg *p;
00366
00367 switch (type) {
00368 case UEM_FD_READ:
00369 case UEM_FD_WRITE:
00370 case UEM_FD_ERROR:
00371 ec_null( p = new_reg() )
00372 p->ur_type = type;
00373 p->ur_resource.ur_fdset = *fdset;
00374 p->ur_size = nfds;
00375 p->ur_data = data;
00376 ec_rv( pthread_create(&p->ur_tid, NULL, thread_fdset, p) )
00377 return true;
00378 default:
00379 errno = EINVAL;
00380 EC_FAIL
00381 }
00382
00383 EC_CLEANUP_BGN
00384 return false;
00385 EC_CLEANUP_END
00386 }
00387
00388 static void *thread_signal(void *arg)
00389 {
00390 struct uem_event *e = NULL;
00391 sigset_t set;
00392 int signum;
00393
00394 pthread_cleanup_push(cleanup_handler, e);
00395 ec_neg1( sigemptyset(&set) )
00396 ec_neg1( sigaddset(&set,
00397 ((struct uem_reg *)arg)->ur_resource.ur_signum) )
00398 while (true) {
00399 ec_null( e = calloc(1, sizeof(struct uem_event)) )
00400 e->ue_reg = (struct uem_reg *)arg;
00401 e->ue_errno = sigwait(&set, &signum);
00402 if (e->ue_errno == 0)
00403 e->ue_result = signum;
00404 ec_false( queue_event(e) )
00405 }
00406 pthread_cleanup_pop(false);
00407 return NULL;
00408
00409 EC_CLEANUP_BGN
00410 uem_free(e);
00411 EC_FLUSH("thread_signal")
00412 return NULL;
00413 EC_CLEANUP_END
00414 }
00415
00416 bool uem_register_signal(int signum, void *data)
00417 {
00418 struct uem_reg *p;
00419 sigset_t set;
00420
00421 ec_neg1( sigemptyset(&set) )
00422 ec_neg1( sigaddset(&set, signum) )
00423 ec_rv( pthread_sigmask(SIG_BLOCK, &set, NULL) )
00424 ec_null( p = new_reg() )
00425 p->ur_type = UEM_SIG;
00426 p->ur_resource.ur_signum = signum;
00427 p->ur_data = data;
00428 ec_rv( pthread_create(&p->ur_tid, NULL, thread_signal, p) )
00429 return true;
00430
00431 EC_CLEANUP_BGN
00432 return false;
00433 EC_CLEANUP_END
00434 }
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444 static void *thread_process(void *arg)
00445 {
00446 struct uem_event *e = NULL;
00447
00448 pthread_cleanup_push(cleanup_handler, e);
00449 ec_null( e = calloc(1, sizeof(struct uem_event)) )
00450 e->ue_reg = (struct uem_reg *)arg;
00451 if (waitpid(e->ue_reg->ur_resource.ur_pid, &e->ue_result, 0) ==
00452 -1)
00453 e->ue_errno = errno;
00454 ec_false( queue_event(e) )
00455 pthread_cleanup_pop(false);
00456 return NULL;
00457
00458 EC_CLEANUP_BGN
00459 uem_free(e);
00460 EC_FLUSH("thread_process")
00461 return NULL;
00462 EC_CLEANUP_END
00463 }
00464
00465 bool uem_register_process(pid_t pid, void *data)
00466 {
00467 struct uem_reg *p;
00468
00469 ec_null( p = new_reg() )
00470 p->ur_type = UEM_PROCESS;
00471 p->ur_resource.ur_pid = pid;
00472 p->ur_size = 0;
00473 p->ur_data = data;
00474 ec_rv( pthread_create(&p->ur_tid, NULL, thread_process, p) )
00475 return true;
00476
00477 EC_CLEANUP_BGN
00478 return false;
00479 EC_CLEANUP_END
00480 }
00481
00482
00483 static void *thread_heartbeat(void *arg)
00484 {
00485 struct uem_event *e = NULL;
00486 struct timespec tspec;
00487
00488 pthread_cleanup_push(cleanup_handler, e);
00489 tspec.tv_sec = ((struct uem_reg *)arg)->ur_resource.ur_usecs / 1000000;
00490 tspec.tv_nsec = (((struct uem_reg *)arg)->ur_resource.ur_usecs %
00491 1000000) * 1000;
00492 while (true) {
00493 ec_null( e = calloc(1, sizeof(struct uem_event)) )
00494 e->ue_reg = (struct uem_reg *)arg;
00495
00496 ec_neg1( nanosleep(&tspec, NULL) )
00497 ec_false( queue_event(e) )
00498 }
00499 pthread_cleanup_pop(false);
00500 return NULL;
00501
00502 EC_CLEANUP_BGN
00503 uem_free(e);
00504 EC_FLUSH("thread_heartbeat")
00505 return NULL;
00506 EC_CLEANUP_END
00507 }
00508
00509 bool uem_register_heartbeat(long usecs, void *data)
00510 {
00511 struct uem_reg *p;
00512
00513 ec_null( p = new_reg() )
00514 p->ur_type = UEM_HEARTBEAT;
00515 p->ur_resource.ur_usecs = usecs;
00516 p->ur_size = 0;
00517 p->ur_data = data;
00518 ec_rv( pthread_create(&p->ur_tid, NULL, thread_heartbeat, p) )
00519 return true;
00520
00521 EC_CLEANUP_BGN
00522 return false;
00523 EC_CLEANUP_END
00524 }
00525
00526 struct uem_event *uem_wait(void)
00527 {
00528 struct uem_event *e = NULL;
00529
00530 ec_rv( pthread_mutex_lock(&uem_mtx) )
00531 while (event_head == NULL)
00532 ec_rv( pthread_cond_wait(&uem_cond_event, &uem_mtx) )
00533 e = event_head;
00534 event_head = event_head->ue_next;
00535 ec_rv( pthread_mutex_unlock(&uem_mtx) )
00536 return e;
00537
00538 EC_CLEANUP_BGN
00539 (void)pthread_mutex_unlock(&uem_mtx);
00540 return NULL;
00541 EC_CLEANUP_END
00542 }
00543
00544 void uem_free(struct uem_event *e)
00545 {
00546 if (e != NULL) {
00547 switch (e->ue_reg->ur_type) {
00548 case UEM_SVMSG:
00549 free_svmsg(e);
00550 break;
00551 case UEM_PXMSG:
00552 break;
00553 case UEM_SVSEM:
00554 break;
00555 case UEM_PXSEM:
00556 break;
00557 case UEM_FD_READ:
00558 case UEM_FD_WRITE:
00559 case UEM_FD_ERROR:
00560 break;
00561 case UEM_SIG:
00562 break;
00563 case UEM_PROCESS:
00564 break;
00565 case UEM_HEARTBEAT:
00566 break;
00567 case UEM_NONE:
00568 break;
00569 }
00570 free(e);
00571 }
00572 }
00573
00574 bool uem_unregister(struct uem_event *e)
00575 {
00576 ec_rv( pthread_cancel(e->ue_reg->ur_tid) )
00577 ec_false( dequeue_event(e->ue_reg) )
00578 switch (e->ue_reg->ur_type) {
00579 case UEM_SVMSG:
00580 break;
00581 case UEM_PXMSG:
00582 break;
00583 case UEM_SVSEM:
00584 free_svsem_reg(e->ue_reg);
00585 break;
00586 case UEM_PXSEM:
00587 break;
00588 case UEM_FD_READ:
00589 case UEM_FD_WRITE:
00590 case UEM_FD_ERROR:
00591 break;
00592 case UEM_SIG:
00593 break;
00594 case UEM_PROCESS:
00595 break;
00596 case UEM_HEARTBEAT:
00597 break;
00598 case UEM_NONE:
00599 break;
00600 }
00601 free(e->ue_reg);
00602 printf("thread cancelled\n");
00603 return true;
00604
00605 EC_CLEANUP_BGN
00606 return false;
00607 EC_CLEANUP_END
00608 }
00609
00610 bool uem_bgn(void)
00611 {
00612 sigset_t set;
00613
00614 ec_neg1( sigemptyset(&set) )
00615 ec_neg1( sigfillset(&set) )
00616 ec_neg1( sigdelset(&set, SIGINT) )
00617 ec_rv( pthread_sigmask(SIG_SETMASK, &set, NULL) )
00618 return true;
00619
00620 EC_CLEANUP_BGN
00621 return false;
00622 EC_CLEANUP_END
00623 }
00624
00625 bool uem_end(void)
00626 {
00627 return true;
00628 }