00001 /* 00002 Unified Event Manager prototype 00003 AUP2, Sec. 5.18.2 00004 00005 Copyright 2003 by Marc J. Rochkind. All rights reserved. 00006 May be copied only for purposes and under conditions described 00007 on the Web page www.basepath.com/aup/copyright.htm. 00008 00009 The Example Files are provided "as is," without any warranty; 00010 without even the implied warranty of merchantability or fitness 00011 for a particular purpose. The author and his publisher are not 00012 responsible for any damages, direct or incidental, resulting 00013 from the use or non-use of these Example Files. 00014 00015 The Example Files may contain defects, and some contain deliberate 00016 coding mistakes that were included for educational reasons. 00017 You are responsible for determining if and how the Example Files 00018 are to be used. 00019 00020 */ 00021 #include "defs.h" 00022 #include "uem.h" 00023 #include <sys/msg.h> 00024 #include <sys/sem.h> 00025 #ifdef POSIX_IPC 00026 #include <mqueue.h> 00027 #endif 00028 /*[top]*/ 00029 static pthread_mutex_t uem_mtx = PTHREAD_MUTEX_INITIALIZER; 00030 static pthread_cond_t uem_cond_event = PTHREAD_COND_INITIALIZER; 00031 static struct uem_event *event_head; 00032 /*[new_reg]*/ 00033 static struct uem_reg *new_reg(void) 00034 { 00035 struct uem_reg *p; 00036 00037 ec_null( p = calloc(1, sizeof(struct uem_reg)) ) 00038 return p; 00039 00040 EC_CLEANUP_BGN 00041 return NULL; 00042 EC_CLEANUP_END 00043 } 00044 /*[]*/ 00045 /* Must contain no cancellation points. */ 00046 /*[queue_event]*/ 00047 static bool queue_event(struct uem_event *e) 00048 { 00049 struct uem_event *cur; 00050 00051 ec_rv( pthread_mutex_lock(&uem_mtx) ) 00052 if (event_head == NULL) 00053 event_head = e; 00054 else { 00055 for (cur = event_head; cur->ue_next != NULL; cur = cur->ue_next) 00056 /* queue same error only once */ 00057 if (e->ue_errno != 0 && 00058 cur->ue_reg->ur_type == e->ue_reg->ur_type && 00059 cur->ue_errno == e->ue_errno) { 00060 ec_rv( pthread_mutex_unlock(&uem_mtx) ) 00061 uem_free(e); 00062 return true; 00063 } 00064 cur->ue_next = e; 00065 } 00066 ec_rv( pthread_cond_signal(&uem_cond_event) ) 00067 ec_rv( pthread_mutex_unlock(&uem_mtx) ) 00068 return true; 00069 00070 EC_CLEANUP_BGN 00071 (void)pthread_mutex_unlock(&uem_mtx); 00072 return false; 00073 EC_CLEANUP_END 00074 } 00075 /*[dequeue_event]*/ 00076 static bool dequeue_event(struct uem_reg *p) 00077 { 00078 struct uem_event *cur, *prev, *next; 00079 00080 ec_rv( pthread_mutex_lock(&uem_mtx) ) 00081 for (cur = event_head; cur != NULL; cur = next) { 00082 next = cur->ue_next; 00083 if (cur->ue_reg == p) { 00084 if (prev == NULL) 00085 event_head = next; 00086 else 00087 prev->ue_next = next; 00088 uem_free(cur); 00089 } 00090 else 00091 prev = cur; 00092 } 00093 ec_rv( pthread_mutex_unlock(&uem_mtx) ) 00094 return true; 00095 00096 EC_CLEANUP_BGN 00097 (void)pthread_mutex_unlock(&uem_mtx); 00098 return false; 00099 EC_CLEANUP_END 00100 } 00101 /*[cleanup_handler]*/ 00102 static void cleanup_handler(void *arg) 00103 { 00104 (void)uem_free((struct uem_event *)arg); 00105 } 00106 /*[]*/ 00107 00108 static void free_svmsg(struct uem_event *e) 00109 { 00110 free(e->ue_buf); 00111 } 00112 /*[]*/ 00113 00114 static void *thread_svmsg(void *arg) 00115 { 00116 struct uem_event *e = NULL; 00117 00118 pthread_cleanup_push(cleanup_handler, e); 00119 while (true) { 00120 ec_null( e = calloc(1, sizeof(struct uem_event)) ) 00121 e->ue_reg = (struct uem_reg *)arg; 00122 ec_null( e->ue_buf = malloc(e->ue_reg->ur_size) ) 00123 if ((e->ue_result = msgrcv(e->ue_reg->ur_resource.ur_mqid, 00124 e->ue_buf, e->ue_reg->ur_size - sizeof(long), 0, 0)) == -1) 00125 e->ue_errno = errno; 00126 ec_false( queue_event(e) ) 00127 } 00128 pthread_cleanup_pop(false); 00129 return NULL; 00130 00131 EC_CLEANUP_BGN 00132 uem_free(e); 00133 EC_FLUSH("thread_svmsg") 00134 return NULL; 00135 EC_CLEANUP_END 00136 } 00137 00138 bool uem_register_svmsg(int mqid, size_t msgsize, void *data) 00139 { 00140 struct uem_reg *p; 00141 00142 ec_null( p = new_reg() ) 00143 p->ur_type = UEM_SVMSG; 00144 p->ur_resource.ur_mqid = mqid; 00145 p->ur_size = msgsize; 00146 p->ur_data = data; 00147 ec_rv( pthread_create(&p->ur_tid, NULL, thread_svmsg, p) ) 00148 return true; 00149 00150 EC_CLEANUP_BGN 00151 return false; 00152 EC_CLEANUP_END 00153 } 00154 00155 static void free_svsem_reg(struct uem_reg *p) 00156 { 00157 } 00158 00159 static void *thread_svsem(void *arg) 00160 { 00161 struct uem_event *e = NULL; 00162 00163 pthread_cleanup_push(cleanup_handler, e); 00164 while (true) { 00165 ec_null( e = calloc(1, sizeof(struct uem_event)) ) 00166 e->ue_reg = (struct uem_reg *)arg; 00167 if ((e->ue_result = semop(e->ue_reg->ur_resource.ur_svsem.s_semid, 00168 e->ue_reg->ur_resource.ur_svsem.s_sops, e->ue_reg->ur_size)) == 00169 -1) 00170 e->ue_errno = errno; 00171 ec_false( queue_event(e) ) 00172 } 00173 pthread_cleanup_pop(false); 00174 return NULL; 00175 00176 EC_CLEANUP_BGN 00177 uem_free(e); 00178 EC_FLUSH("thread_svsem") 00179 return NULL; 00180 EC_CLEANUP_END 00181 } 00182 00183 bool uem_register_svsem(int semid, struct sembuf *sops, size_t nsops, 00184 void *data) 00185 { 00186 struct uem_reg *p; 00187 00188 ec_null( p = new_reg() ) 00189 p->ur_type = UEM_SVSEM; 00190 p->ur_resource.ur_svsem.s_semid = semid; 00191 ec_null( p->ur_resource.ur_svsem.s_sops = 00192 calloc(1, sizeof(struct sembuf) * nsops) ) 00193 memcpy(p->ur_resource.ur_svsem.s_sops, sops, 00194 sizeof(struct sembuf) * nsops); 00195 p->ur_size = nsops; 00196 p->ur_data = data; 00197 ec_rv( pthread_create(&p->ur_tid, NULL, thread_svsem, p) ) 00198 return true; 00199 00200 EC_CLEANUP_BGN 00201 return false; 00202 EC_CLEANUP_END 00203 } 00204 00205 #ifdef POSIX_IPC 00206 static void *thread_pxmsg(void *arg) 00207 { 00208 struct uem_event *e = NULL; 00209 struct mq_attr attr; 00210 00211 pthread_cleanup_push(cleanup_handler, e); 00212 while (true) { 00213 ec_null( e = calloc(1, sizeof(struct uem_event)) ) 00214 e->ue_reg = (struct uem_reg *)arg; 00215 ec_neg1( mq_getattr(e->ue_reg->ur_resource.ur_mqd, &attr) ) 00216 ec_null( e->ue_buf = malloc(attr.mq_msgsize) ) 00217 if ((e->ue_result = mq_receive(e->ue_reg->ur_resource.ur_mqd, 00218 e->ue_buf, attr.mq_msgsize, NULL)) == -1) 00219 e->ue_errno = errno; 00220 ec_false( queue_event(e) ) 00221 } 00222 pthread_cleanup_pop(false); 00223 return NULL; 00224 00225 EC_CLEANUP_BGN 00226 uem_free(e); 00227 EC_FLUSH("thread_pxmsg") 00228 return NULL; 00229 EC_CLEANUP_END 00230 } 00231 00232 bool uem_register_pxmsg(mqd_t mqd, void *data) 00233 { 00234 struct uem_reg *p; 00235 00236 ec_null( p = new_reg() ) 00237 p->ur_type = UEM_PXMSG; 00238 p->ur_resource.ur_mqd = mqd; 00239 p->ur_data = data; 00240 ec_rv( pthread_create(&p->ur_tid, NULL, thread_pxmsg, p) ) 00241 return true; 00242 00243 EC_CLEANUP_BGN 00244 return false; 00245 EC_CLEANUP_END 00246 } 00247 00248 static void *thread_pxsem(void *arg) 00249 { 00250 struct uem_event *e = NULL; 00251 00252 pthread_cleanup_push(cleanup_handler, e); 00253 while (true) { 00254 ec_null( e = calloc(1, sizeof(struct uem_event)) ) 00255 e->ue_reg = (struct uem_reg *)arg; 00256 if ((e->ue_result = sem_wait(e->ue_reg->ur_resource.ur_sem)) == -1) 00257 e->ue_errno = errno; 00258 ec_false( queue_event(e) ) 00259 } 00260 pthread_cleanup_pop(false); 00261 return NULL; 00262 00263 EC_CLEANUP_BGN 00264 uem_free(e); 00265 EC_FLUSH("thread_pxsem") 00266 return NULL; 00267 EC_CLEANUP_END 00268 } 00269 00270 bool uem_register_pxsem(sem_t *sem, void *data) 00271 { 00272 struct uem_reg *p; 00273 00274 ec_null( p = new_reg() ) 00275 p->ur_type = UEM_PXSEM; 00276 p->ur_resource.ur_sem = sem; 00277 p->ur_data = data; 00278 ec_rv( pthread_create(&p->ur_tid, NULL, thread_pxsem, p) ) 00279 return true; 00280 00281 EC_CLEANUP_BGN 00282 return false; 00283 EC_CLEANUP_END 00284 } 00285 #endif 00286 00287 /* 00288 Problem is that this thread calls select in a loop, and select returns if the condition is met. It does not block waiting for something new to happen. Solutions: 00289 00290 1. Queue same event only once. Still causes a lot of wasted CPU. 00291 2. Sleep for a while. Causes poor responsiveness. 00292 3. Change the API to temporarily remove a ready file descriptor from the set. 00293 */ 00294 static void *thread_fdset(void *arg) 00295 { 00296 struct uem_event *e = NULL; 00297 struct uem_reg *p = (struct uem_reg *)arg; 00298 fd_set fdset, *fdset_read = NULL, *fdset_write = NULL, *fdset_error = NULL; 00299 int i; 00300 00301 pthread_cleanup_push(cleanup_handler, e); 00302 while (true) { 00303 /* 00304 Pre-allocate first event so that errno from select can be returned without an allocation. 00305 */ 00306 ec_null( e = calloc(1, sizeof(struct uem_event)) ) 00307 e->ue_reg = p; 00308 fdset = p->ur_resource.ur_fdset; 00309 switch(p->ur_type) { 00310 case UEM_FD_READ: 00311 fdset_read = &fdset; 00312 break; 00313 case UEM_FD_WRITE: 00314 fdset_write = &fdset; 00315 break; 00316 case UEM_FD_ERROR: 00317 fdset_error = &fdset; 00318 break; 00319 default: 00320 errno = EINVAL; 00321 EC_FAIL 00322 } 00323 if (select(p->ur_size, fdset_read, fdset_write, fdset_error, NULL) 00324 == -1) { 00325 e->ue_errno = errno; 00326 ec_false( queue_event(e) ) 00327 } 00328 else { 00329 for (i = 0; i < p->ur_size; i++) 00330 if (FD_ISSET(i, &fdset)) { 00331 struct uem_event *cur; 00332 00333 ec_rv( pthread_mutex_lock(&uem_mtx) ) 00334 for (cur = event_head; cur != NULL; cur = cur->ue_next) 00335 if (cur->ue_reg->ur_type == p->ur_type && 00336 cur->ue_result == i) 00337 break; 00338 ec_rv( pthread_mutex_unlock(&uem_mtx) ) 00339 if (cur != NULL) 00340 continue; 00341 if (e == NULL) { 00342 ec_null( e = calloc(1, sizeof(struct uem_event)) ) 00343 e->ue_reg = p; 00344 } 00345 e->ue_result = i; 00346 ec_false( queue_event(e) ) 00347 e = NULL; 00348 } 00349 } 00350 } 00351 pthread_cleanup_pop(false); 00352 return NULL; 00353 00354 EC_CLEANUP_BGN 00355 (void)pthread_mutex_unlock(&uem_mtx); 00356 uem_free(e); 00357 EC_FLUSH("thread_fdset") 00358 return NULL; 00359 EC_CLEANUP_END 00360 } 00361 00362 bool uem_register_fdset(int nfds, fd_set *fdset, enum UEM_TYPE type, 00363 void *data) 00364 { 00365 struct uem_reg *p; 00366 00367 switch (type) { 00368 case UEM_FD_READ: 00369 case UEM_FD_WRITE: 00370 case UEM_FD_ERROR: 00371 ec_null( p = new_reg() ) 00372 p->ur_type = type; 00373 p->ur_resource.ur_fdset = *fdset; 00374 p->ur_size = nfds; 00375 p->ur_data = data; 00376 ec_rv( pthread_create(&p->ur_tid, NULL, thread_fdset, p) ) 00377 return true; 00378 default: 00379 errno = EINVAL; 00380 EC_FAIL 00381 } 00382 00383 EC_CLEANUP_BGN 00384 return false; 00385 EC_CLEANUP_END 00386 } 00387 00388 static void *thread_signal(void *arg) 00389 { 00390 struct uem_event *e = NULL; 00391 sigset_t set; 00392 int signum; 00393 00394 pthread_cleanup_push(cleanup_handler, e); 00395 ec_neg1( sigemptyset(&set) ) 00396 ec_neg1( sigaddset(&set, 00397 ((struct uem_reg *)arg)->ur_resource.ur_signum) ) 00398 while (true) { 00399 ec_null( e = calloc(1, sizeof(struct uem_event)) ) 00400 e->ue_reg = (struct uem_reg *)arg; 00401 e->ue_errno = sigwait(&set, &signum); 00402 if (e->ue_errno == 0) 00403 e->ue_result = signum; 00404 ec_false( queue_event(e) ) 00405 } 00406 pthread_cleanup_pop(false); 00407 return NULL; 00408 00409 EC_CLEANUP_BGN 00410 uem_free(e); 00411 EC_FLUSH("thread_signal") 00412 return NULL; 00413 EC_CLEANUP_END 00414 } 00415 00416 bool uem_register_signal(int signum, void *data) 00417 { 00418 struct uem_reg *p; 00419 sigset_t set; 00420 00421 ec_neg1( sigemptyset(&set) ) 00422 ec_neg1( sigaddset(&set, signum) ) 00423 ec_rv( pthread_sigmask(SIG_BLOCK, &set, NULL) ) 00424 ec_null( p = new_reg() ) 00425 p->ur_type = UEM_SIG; 00426 p->ur_resource.ur_signum = signum; 00427 p->ur_data = data; 00428 ec_rv( pthread_create(&p->ur_tid, NULL, thread_signal, p) ) 00429 return true; 00430 00431 EC_CLEANUP_BGN 00432 return false; 00433 EC_CLEANUP_END 00434 } 00435 00436 /* 00437 Problem on systems that implemented threads as processes is that the pid that we want to wait on is not our child. 00438 00439 A way to implement this is to keep track of all the pids to be waited for and use the heartbeat event to wake up every so often to do waitpids for them without creating a thread. Left as an exercise. 00440 00441 What is here works fine on Solaris. 00442 */ 00443 /*[thread_process]*/ 00444 static void *thread_process(void *arg) 00445 { 00446 struct uem_event *e = NULL; 00447 00448 pthread_cleanup_push(cleanup_handler, e); 00449 ec_null( e = calloc(1, sizeof(struct uem_event)) ) 00450 e->ue_reg = (struct uem_reg *)arg; 00451 if (waitpid(e->ue_reg->ur_resource.ur_pid, &e->ue_result, 0) == 00452 -1) 00453 e->ue_errno = errno; 00454 ec_false( queue_event(e) ) 00455 pthread_cleanup_pop(false); 00456 return NULL; 00457 00458 EC_CLEANUP_BGN 00459 uem_free(e); 00460 EC_FLUSH("thread_process") 00461 return NULL; 00462 EC_CLEANUP_END 00463 } 00464 /*[uem_register_process]*/ 00465 bool uem_register_process(pid_t pid, void *data) 00466 { 00467 struct uem_reg *p; 00468 00469 ec_null( p = new_reg() ) 00470 p->ur_type = UEM_PROCESS; 00471 p->ur_resource.ur_pid = pid; 00472 p->ur_size = 0; 00473 p->ur_data = data; 00474 ec_rv( pthread_create(&p->ur_tid, NULL, thread_process, p) ) 00475 return true; 00476 00477 EC_CLEANUP_BGN 00478 return false; 00479 EC_CLEANUP_END 00480 } 00481 /*[]*/ 00482 00483 static void *thread_heartbeat(void *arg) 00484 { 00485 struct uem_event *e = NULL; 00486 struct timespec tspec; 00487 00488 pthread_cleanup_push(cleanup_handler, e); 00489 tspec.tv_sec = ((struct uem_reg *)arg)->ur_resource.ur_usecs / 1000000; 00490 tspec.tv_nsec = (((struct uem_reg *)arg)->ur_resource.ur_usecs % 00491 1000000) * 1000; 00492 while (true) { 00493 ec_null( e = calloc(1, sizeof(struct uem_event)) ) 00494 e->ue_reg = (struct uem_reg *)arg; 00495 00496 ec_neg1( nanosleep(&tspec, NULL) ) 00497 ec_false( queue_event(e) ) 00498 } 00499 pthread_cleanup_pop(false); 00500 return NULL; 00501 00502 EC_CLEANUP_BGN 00503 uem_free(e); 00504 EC_FLUSH("thread_heartbeat") 00505 return NULL; 00506 EC_CLEANUP_END 00507 } 00508 00509 bool uem_register_heartbeat(long usecs, void *data) 00510 { 00511 struct uem_reg *p; 00512 00513 ec_null( p = new_reg() ) 00514 p->ur_type = UEM_HEARTBEAT; 00515 p->ur_resource.ur_usecs = usecs; 00516 p->ur_size = 0; 00517 p->ur_data = data; 00518 ec_rv( pthread_create(&p->ur_tid, NULL, thread_heartbeat, p) ) 00519 return true; 00520 00521 EC_CLEANUP_BGN 00522 return false; 00523 EC_CLEANUP_END 00524 } 00525 /*[uem_wait]*/ 00526 struct uem_event *uem_wait(void) 00527 { 00528 struct uem_event *e = NULL; 00529 00530 ec_rv( pthread_mutex_lock(&uem_mtx) ) 00531 while (event_head == NULL) 00532 ec_rv( pthread_cond_wait(&uem_cond_event, &uem_mtx) ) 00533 e = event_head; 00534 event_head = event_head->ue_next; 00535 ec_rv( pthread_mutex_unlock(&uem_mtx) ) 00536 return e; 00537 00538 EC_CLEANUP_BGN 00539 (void)pthread_mutex_unlock(&uem_mtx); 00540 return NULL; 00541 EC_CLEANUP_END 00542 } 00543 /*[]*/ 00544 void uem_free(struct uem_event *e) 00545 { 00546 if (e != NULL) { 00547 switch (e->ue_reg->ur_type) { 00548 case UEM_SVMSG: 00549 free_svmsg(e); 00550 break; 00551 case UEM_PXMSG: 00552 break; 00553 case UEM_SVSEM: 00554 break; 00555 case UEM_PXSEM: 00556 break; 00557 case UEM_FD_READ: 00558 case UEM_FD_WRITE: 00559 case UEM_FD_ERROR: 00560 break; 00561 case UEM_SIG: 00562 break; 00563 case UEM_PROCESS: 00564 break; 00565 case UEM_HEARTBEAT: 00566 break; 00567 case UEM_NONE: 00568 break; 00569 } 00570 free(e); 00571 } 00572 } 00573 00574 bool uem_unregister(struct uem_event *e) 00575 { 00576 ec_rv( pthread_cancel(e->ue_reg->ur_tid) ) 00577 ec_false( dequeue_event(e->ue_reg) ) 00578 switch (e->ue_reg->ur_type) { 00579 case UEM_SVMSG: 00580 break; 00581 case UEM_PXMSG: 00582 break; 00583 case UEM_SVSEM: 00584 free_svsem_reg(e->ue_reg); 00585 break; 00586 case UEM_PXSEM: 00587 break; 00588 case UEM_FD_READ: 00589 case UEM_FD_WRITE: 00590 case UEM_FD_ERROR: 00591 break; 00592 case UEM_SIG: 00593 break; 00594 case UEM_PROCESS: 00595 break; 00596 case UEM_HEARTBEAT: 00597 break; 00598 case UEM_NONE: 00599 break; 00600 } 00601 free(e->ue_reg); 00602 printf("thread cancelled\n"); 00603 return true; 00604 00605 EC_CLEANUP_BGN 00606 return false; 00607 EC_CLEANUP_END 00608 } 00609 00610 bool uem_bgn(void) 00611 { 00612 sigset_t set; 00613 00614 ec_neg1( sigemptyset(&set) ) 00615 ec_neg1( sigfillset(&set) ) 00616 ec_neg1( sigdelset(&set, SIGINT) ) /* convenient for debugging */ 00617 ec_rv( pthread_sigmask(SIG_SETMASK, &set, NULL) ) 00618 return true; 00619 00620 EC_CLEANUP_BGN 00621 return false; 00622 EC_CLEANUP_END 00623 } 00624 00625 bool uem_end(void) 00626 { 00627 return true; 00628 }