©2004 by Marc J. Rochkind. All rights reserved. Portions marked "Open Source" may be copied under license.

 

Main Page   Modules   Namespace List   Class Hierarchy   Compound List   File List   Namespace Members   Compound Members   File Members  

c7/uem.c

Go to the documentation of this file.
00001 /*
00002     Unified Event Manager prototype
00003     AUP2, Sec. 5.18.2
00004 
00005     Copyright 2003 by Marc J. Rochkind. All rights reserved.
00006     May be copied only for purposes and under conditions described
00007     on the Web page www.basepath.com/aup/copyright.htm.
00008 
00009     The Example Files are provided "as is," without any warranty;
00010     without even the implied warranty of merchantability or fitness
00011     for a particular purpose. The author and his publisher are not
00012     responsible for any damages, direct or incidental, resulting
00013     from the use or non-use of these Example Files.
00014 
00015     The Example Files may contain defects, and some contain deliberate
00016     coding mistakes that were included for educational reasons.
00017     You are responsible for determining if and how the Example Files
00018     are to be used.
00019 
00020 */
00021 #include "defs.h"
00022 #include "uem.h"
00023 #include <sys/msg.h>
00024 #include <sys/sem.h>
00025 #ifdef POSIX_IPC
00026 #include <mqueue.h>
00027 #endif
00028 /*[top]*/
00029 static pthread_mutex_t uem_mtx = PTHREAD_MUTEX_INITIALIZER;
00030 static pthread_cond_t uem_cond_event = PTHREAD_COND_INITIALIZER;
00031 static struct uem_event *event_head;
00032 /*[new_reg]*/
00033 static struct uem_reg *new_reg(void)
00034 {
00035     struct uem_reg *p;
00036 
00037     ec_null( p = calloc(1, sizeof(struct uem_reg)) )
00038     return p;
00039 
00040 EC_CLEANUP_BGN
00041     return NULL;
00042 EC_CLEANUP_END
00043 }
00044 /*[]*/
00045 /* Must contain no cancellation points. */
00046 /*[queue_event]*/
00047 static bool queue_event(struct uem_event *e)
00048 {
00049     struct uem_event *cur;
00050 
00051     ec_rv( pthread_mutex_lock(&uem_mtx) )
00052     if (event_head == NULL)
00053         event_head = e;
00054     else {
00055         for (cur = event_head; cur->ue_next != NULL; cur = cur->ue_next)
00056             /* queue same error only once */
00057             if (e->ue_errno != 0 &&
00058               cur->ue_reg->ur_type == e->ue_reg->ur_type &&
00059               cur->ue_errno == e->ue_errno) {
00060                 ec_rv( pthread_mutex_unlock(&uem_mtx) )
00061                 uem_free(e);
00062                 return true;
00063             }
00064         cur->ue_next = e;
00065     }
00066     ec_rv( pthread_cond_signal(&uem_cond_event) )
00067     ec_rv( pthread_mutex_unlock(&uem_mtx) )
00068     return true;
00069 
00070 EC_CLEANUP_BGN
00071     (void)pthread_mutex_unlock(&uem_mtx);
00072     return false;
00073 EC_CLEANUP_END
00074 }
00075 /*[dequeue_event]*/
00076 static bool dequeue_event(struct uem_reg *p)
00077 {
00078     struct uem_event *cur, *prev, *next;
00079 
00080     ec_rv( pthread_mutex_lock(&uem_mtx) )
00081     for (cur = event_head; cur != NULL; cur = next) {
00082         next = cur->ue_next;
00083         if (cur->ue_reg == p) {
00084             if (prev == NULL)
00085                 event_head = next;
00086             else
00087                 prev->ue_next = next;
00088             uem_free(cur);
00089         }
00090         else
00091             prev = cur;
00092     }
00093     ec_rv( pthread_mutex_unlock(&uem_mtx) )
00094     return true;
00095 
00096 EC_CLEANUP_BGN
00097     (void)pthread_mutex_unlock(&uem_mtx);
00098     return false;
00099 EC_CLEANUP_END
00100 }
00101 /*[cleanup_handler]*/
00102 static void cleanup_handler(void *arg)
00103 {
00104     (void)uem_free((struct uem_event *)arg);
00105 }
00106 /*[]*/
00107 
00108 static void free_svmsg(struct uem_event *e)
00109 {
00110     free(e->ue_buf);
00111 }
00112 /*[]*/
00113 
00114 static void *thread_svmsg(void *arg)
00115 {
00116     struct uem_event *e = NULL;
00117 
00118     pthread_cleanup_push(cleanup_handler, e);
00119     while (true) {
00120         ec_null( e = calloc(1, sizeof(struct uem_event)) )
00121         e->ue_reg = (struct uem_reg *)arg;
00122         ec_null( e->ue_buf = malloc(e->ue_reg->ur_size) )
00123         if ((e->ue_result = msgrcv(e->ue_reg->ur_resource.ur_mqid,
00124           e->ue_buf, e->ue_reg->ur_size - sizeof(long), 0, 0)) == -1)
00125             e->ue_errno = errno;
00126         ec_false( queue_event(e) )
00127     }
00128     pthread_cleanup_pop(false);
00129     return NULL;
00130 
00131 EC_CLEANUP_BGN
00132     uem_free(e);
00133     EC_FLUSH("thread_svmsg")
00134     return NULL;
00135 EC_CLEANUP_END
00136 }
00137 
00138 bool uem_register_svmsg(int mqid, size_t msgsize, void *data)
00139 {
00140     struct uem_reg *p;
00141 
00142     ec_null( p = new_reg() )
00143     p->ur_type = UEM_SVMSG;
00144     p->ur_resource.ur_mqid = mqid;
00145     p->ur_size = msgsize;
00146     p->ur_data = data;
00147     ec_rv( pthread_create(&p->ur_tid, NULL, thread_svmsg, p) )
00148     return true;
00149 
00150 EC_CLEANUP_BGN
00151     return false;
00152 EC_CLEANUP_END
00153 }
00154 
00155 static void free_svsem_reg(struct uem_reg *p)
00156 {
00157 }
00158 
00159 static void *thread_svsem(void *arg)
00160 {
00161     struct uem_event *e = NULL;
00162 
00163     pthread_cleanup_push(cleanup_handler, e);
00164     while (true) {
00165         ec_null( e = calloc(1, sizeof(struct uem_event)) )
00166         e->ue_reg = (struct uem_reg *)arg;
00167         if ((e->ue_result = semop(e->ue_reg->ur_resource.ur_svsem.s_semid,
00168           e->ue_reg->ur_resource.ur_svsem.s_sops, e->ue_reg->ur_size)) ==
00169           -1)
00170             e->ue_errno = errno;
00171         ec_false( queue_event(e) )
00172     }
00173     pthread_cleanup_pop(false);
00174     return NULL;
00175 
00176 EC_CLEANUP_BGN
00177     uem_free(e);
00178     EC_FLUSH("thread_svsem")
00179     return NULL;
00180 EC_CLEANUP_END
00181 }
00182 
00183 bool uem_register_svsem(int semid, struct sembuf *sops, size_t nsops,
00184   void *data)
00185 {
00186     struct uem_reg *p;
00187 
00188     ec_null( p = new_reg() )
00189     p->ur_type = UEM_SVSEM;
00190     p->ur_resource.ur_svsem.s_semid = semid;
00191     ec_null( p->ur_resource.ur_svsem.s_sops =
00192       calloc(1, sizeof(struct sembuf) * nsops) )
00193     memcpy(p->ur_resource.ur_svsem.s_sops, sops,
00194       sizeof(struct sembuf) * nsops);
00195     p->ur_size = nsops;
00196     p->ur_data = data;
00197     ec_rv( pthread_create(&p->ur_tid, NULL, thread_svsem, p) )
00198     return true;
00199 
00200 EC_CLEANUP_BGN
00201     return false;
00202 EC_CLEANUP_END
00203 }
00204 
00205 #ifdef POSIX_IPC
00206 static void *thread_pxmsg(void *arg)
00207 {
00208     struct uem_event *e = NULL;
00209     struct mq_attr attr;
00210 
00211     pthread_cleanup_push(cleanup_handler, e);
00212     while (true) {
00213         ec_null( e = calloc(1, sizeof(struct uem_event)) )
00214         e->ue_reg = (struct uem_reg *)arg;
00215         ec_neg1( mq_getattr(e->ue_reg->ur_resource.ur_mqd, &attr) )
00216         ec_null( e->ue_buf = malloc(attr.mq_msgsize) )
00217         if ((e->ue_result = mq_receive(e->ue_reg->ur_resource.ur_mqd,
00218           e->ue_buf, attr.mq_msgsize, NULL)) == -1)
00219             e->ue_errno = errno;
00220         ec_false( queue_event(e) )
00221     }
00222     pthread_cleanup_pop(false);
00223     return NULL;
00224 
00225 EC_CLEANUP_BGN
00226     uem_free(e);
00227     EC_FLUSH("thread_pxmsg")
00228     return NULL;
00229 EC_CLEANUP_END
00230 }
00231 
00232 bool uem_register_pxmsg(mqd_t mqd, void *data)
00233 {
00234     struct uem_reg *p;
00235 
00236     ec_null( p = new_reg() )
00237     p->ur_type = UEM_PXMSG;
00238     p->ur_resource.ur_mqd = mqd;
00239     p->ur_data = data;
00240     ec_rv( pthread_create(&p->ur_tid, NULL, thread_pxmsg, p) )
00241     return true;
00242 
00243 EC_CLEANUP_BGN
00244     return false;
00245 EC_CLEANUP_END
00246 }
00247 
00248 static void *thread_pxsem(void *arg)
00249 {
00250     struct uem_event *e = NULL;
00251 
00252     pthread_cleanup_push(cleanup_handler, e);
00253     while (true) {
00254         ec_null( e = calloc(1, sizeof(struct uem_event)) )
00255         e->ue_reg = (struct uem_reg *)arg;
00256         if ((e->ue_result = sem_wait(e->ue_reg->ur_resource.ur_sem)) == -1)
00257             e->ue_errno = errno;
00258         ec_false( queue_event(e) )
00259     }
00260     pthread_cleanup_pop(false);
00261     return NULL;
00262 
00263 EC_CLEANUP_BGN
00264     uem_free(e);
00265     EC_FLUSH("thread_pxsem")
00266     return NULL;
00267 EC_CLEANUP_END
00268 }
00269 
00270 bool uem_register_pxsem(sem_t *sem, void *data)
00271 {
00272     struct uem_reg *p;
00273 
00274     ec_null( p = new_reg() )
00275     p->ur_type = UEM_PXSEM;
00276     p->ur_resource.ur_sem = sem;
00277     p->ur_data = data;
00278     ec_rv( pthread_create(&p->ur_tid, NULL, thread_pxsem, p) )
00279     return true;
00280 
00281 EC_CLEANUP_BGN
00282     return false;
00283 EC_CLEANUP_END
00284 }
00285 #endif
00286 
00287 /*
00288     Problem is that this thread calls select in a loop, and select returns if the condition is met. It does not block waiting for something new to happen. Solutions:
00289 
00290     1. Queue same event only once. Still causes a lot of wasted CPU.
00291     2. Sleep for a while. Causes poor responsiveness.
00292     3. Change the API to temporarily remove a ready file descriptor from the set.
00293 */
00294 static void *thread_fdset(void *arg)
00295 {
00296     struct uem_event *e = NULL;
00297     struct uem_reg *p = (struct uem_reg *)arg;
00298     fd_set fdset, *fdset_read = NULL, *fdset_write = NULL, *fdset_error = NULL;
00299     int i;
00300 
00301     pthread_cleanup_push(cleanup_handler, e);
00302     while (true) {
00303         /*
00304             Pre-allocate first event so that errno from select can be returned without an allocation.
00305         */
00306         ec_null( e = calloc(1, sizeof(struct uem_event)) )
00307         e->ue_reg = p;
00308         fdset = p->ur_resource.ur_fdset;
00309         switch(p->ur_type) {
00310         case UEM_FD_READ:
00311             fdset_read = &fdset;
00312             break;
00313         case UEM_FD_WRITE:
00314             fdset_write = &fdset;
00315             break;
00316         case UEM_FD_ERROR:
00317             fdset_error = &fdset;
00318             break;
00319         default:
00320             errno = EINVAL;
00321             EC_FAIL
00322         }
00323         if (select(p->ur_size, fdset_read, fdset_write, fdset_error, NULL)
00324           == -1) {
00325             e->ue_errno = errno;
00326             ec_false( queue_event(e) )
00327         }
00328         else {
00329             for (i = 0; i < p->ur_size; i++)
00330                 if (FD_ISSET(i, &fdset)) {
00331                     struct uem_event *cur;
00332 
00333                     ec_rv( pthread_mutex_lock(&uem_mtx) )
00334                     for (cur = event_head; cur != NULL; cur = cur->ue_next)
00335                         if (cur->ue_reg->ur_type == p->ur_type &&
00336                           cur->ue_result == i)
00337                             break;
00338                     ec_rv( pthread_mutex_unlock(&uem_mtx) )
00339                     if (cur != NULL)
00340                         continue;
00341                     if (e == NULL) {
00342                         ec_null( e = calloc(1, sizeof(struct uem_event)) )
00343                         e->ue_reg = p;
00344                     }
00345                     e->ue_result = i;
00346                     ec_false( queue_event(e) )
00347                     e = NULL;
00348                 }
00349         }
00350     }
00351     pthread_cleanup_pop(false);
00352     return NULL;
00353 
00354 EC_CLEANUP_BGN
00355     (void)pthread_mutex_unlock(&uem_mtx);
00356     uem_free(e);
00357     EC_FLUSH("thread_fdset")
00358     return NULL;
00359 EC_CLEANUP_END
00360 }
00361 
00362 bool uem_register_fdset(int nfds, fd_set *fdset, enum UEM_TYPE type,
00363   void *data)
00364 {
00365     struct uem_reg *p;
00366 
00367     switch (type) {
00368     case UEM_FD_READ:
00369     case UEM_FD_WRITE:
00370     case UEM_FD_ERROR:
00371         ec_null( p = new_reg() )
00372         p->ur_type = type;
00373         p->ur_resource.ur_fdset = *fdset;
00374         p->ur_size = nfds;
00375         p->ur_data = data;
00376         ec_rv( pthread_create(&p->ur_tid, NULL, thread_fdset, p) )
00377         return true;
00378     default:
00379         errno = EINVAL;
00380         EC_FAIL
00381     }
00382 
00383 EC_CLEANUP_BGN
00384     return false;
00385 EC_CLEANUP_END
00386 }
00387 
00388 static void *thread_signal(void *arg)
00389 {
00390     struct uem_event *e = NULL;
00391     sigset_t set;
00392     int signum;
00393 
00394     pthread_cleanup_push(cleanup_handler, e);
00395     ec_neg1( sigemptyset(&set) )
00396     ec_neg1( sigaddset(&set,
00397       ((struct uem_reg *)arg)->ur_resource.ur_signum) )
00398     while (true) {
00399         ec_null( e = calloc(1, sizeof(struct uem_event)) )
00400         e->ue_reg = (struct uem_reg *)arg;
00401         e->ue_errno = sigwait(&set, &signum);
00402         if (e->ue_errno == 0)
00403             e->ue_result = signum;
00404         ec_false( queue_event(e) )
00405     }
00406     pthread_cleanup_pop(false);
00407     return NULL;
00408 
00409 EC_CLEANUP_BGN
00410     uem_free(e);
00411     EC_FLUSH("thread_signal")
00412     return NULL;
00413 EC_CLEANUP_END
00414 }
00415 
00416 bool uem_register_signal(int signum, void *data)
00417 {
00418     struct uem_reg *p;
00419     sigset_t set;
00420 
00421     ec_neg1( sigemptyset(&set) )
00422     ec_neg1( sigaddset(&set, signum) )
00423     ec_rv( pthread_sigmask(SIG_BLOCK, &set, NULL) )
00424     ec_null( p = new_reg() )
00425     p->ur_type = UEM_SIG;
00426     p->ur_resource.ur_signum = signum;
00427     p->ur_data = data;
00428     ec_rv( pthread_create(&p->ur_tid, NULL, thread_signal, p) )
00429     return true;
00430 
00431 EC_CLEANUP_BGN
00432     return false;
00433 EC_CLEANUP_END
00434 }
00435 
00436 /*
00437     Problem on systems that implemented threads as processes is that the pid that we want to wait on is not our child.
00438 
00439     A way to implement this is to keep track of all the pids to be waited for and use the heartbeat event to wake up every so often to do waitpids for them without creating a thread. Left as an exercise.
00440 
00441     What is here works fine on Solaris.
00442 */
00443 /*[thread_process]*/
00444 static void *thread_process(void *arg)
00445 {
00446     struct uem_event *e = NULL;
00447 
00448     pthread_cleanup_push(cleanup_handler, e);
00449     ec_null( e = calloc(1, sizeof(struct uem_event)) )
00450     e->ue_reg = (struct uem_reg *)arg;
00451     if (waitpid(e->ue_reg->ur_resource.ur_pid, &e->ue_result, 0) ==
00452       -1)
00453         e->ue_errno = errno;
00454     ec_false( queue_event(e) )
00455     pthread_cleanup_pop(false);
00456     return NULL;
00457 
00458 EC_CLEANUP_BGN
00459     uem_free(e);
00460     EC_FLUSH("thread_process")
00461     return NULL;
00462 EC_CLEANUP_END
00463 }
00464 /*[uem_register_process]*/
00465 bool uem_register_process(pid_t pid, void *data)
00466 {
00467     struct uem_reg *p;
00468 
00469     ec_null( p = new_reg() )
00470     p->ur_type = UEM_PROCESS;
00471     p->ur_resource.ur_pid = pid;
00472     p->ur_size = 0;
00473     p->ur_data = data;
00474     ec_rv( pthread_create(&p->ur_tid, NULL, thread_process, p) )
00475     return true;
00476 
00477 EC_CLEANUP_BGN
00478     return false;
00479 EC_CLEANUP_END
00480 }
00481 /*[]*/
00482 
00483 static void *thread_heartbeat(void *arg)
00484 {
00485     struct uem_event *e = NULL;
00486     struct timespec tspec;
00487 
00488     pthread_cleanup_push(cleanup_handler, e);
00489     tspec.tv_sec = ((struct uem_reg *)arg)->ur_resource.ur_usecs / 1000000;
00490     tspec.tv_nsec = (((struct uem_reg *)arg)->ur_resource.ur_usecs %
00491       1000000) * 1000;
00492     while (true) {
00493         ec_null( e = calloc(1, sizeof(struct uem_event)) )
00494         e->ue_reg = (struct uem_reg *)arg;
00495 
00496         ec_neg1( nanosleep(&tspec, NULL) )
00497         ec_false( queue_event(e) )
00498     }
00499     pthread_cleanup_pop(false);
00500     return NULL;
00501 
00502 EC_CLEANUP_BGN
00503     uem_free(e);
00504     EC_FLUSH("thread_heartbeat")
00505     return NULL;
00506 EC_CLEANUP_END
00507 }
00508 
00509 bool uem_register_heartbeat(long usecs, void *data)
00510 {
00511     struct uem_reg *p;
00512 
00513     ec_null( p = new_reg() )
00514     p->ur_type = UEM_HEARTBEAT;
00515     p->ur_resource.ur_usecs = usecs;
00516     p->ur_size = 0;
00517     p->ur_data = data;
00518     ec_rv( pthread_create(&p->ur_tid, NULL, thread_heartbeat, p) )
00519     return true;
00520 
00521 EC_CLEANUP_BGN
00522     return false;
00523 EC_CLEANUP_END
00524 }
00525 /*[uem_wait]*/
00526 struct uem_event *uem_wait(void)
00527 {
00528     struct uem_event *e = NULL;
00529 
00530     ec_rv( pthread_mutex_lock(&uem_mtx) )
00531     while (event_head == NULL)
00532         ec_rv( pthread_cond_wait(&uem_cond_event, &uem_mtx) )
00533     e = event_head;
00534     event_head = event_head->ue_next;
00535     ec_rv( pthread_mutex_unlock(&uem_mtx) )
00536     return e;
00537 
00538 EC_CLEANUP_BGN
00539     (void)pthread_mutex_unlock(&uem_mtx);
00540     return NULL;
00541 EC_CLEANUP_END
00542 }
00543 /*[]*/
00544 void uem_free(struct uem_event *e)
00545 {
00546     if (e != NULL) {
00547         switch (e->ue_reg->ur_type) {
00548         case UEM_SVMSG:
00549             free_svmsg(e);
00550             break;
00551         case UEM_PXMSG:
00552             break;
00553         case UEM_SVSEM:
00554             break;
00555         case UEM_PXSEM:
00556             break;
00557         case UEM_FD_READ:
00558         case UEM_FD_WRITE:
00559         case UEM_FD_ERROR:
00560             break;
00561         case UEM_SIG:
00562             break;
00563         case UEM_PROCESS:
00564             break;
00565         case UEM_HEARTBEAT:
00566             break;
00567         case UEM_NONE:
00568             break;
00569         }
00570         free(e);
00571     }
00572 }
00573 
00574 bool uem_unregister(struct uem_event *e)
00575 {
00576     ec_rv( pthread_cancel(e->ue_reg->ur_tid) )
00577     ec_false( dequeue_event(e->ue_reg) )
00578     switch (e->ue_reg->ur_type) {
00579     case UEM_SVMSG:
00580         break;
00581     case UEM_PXMSG:
00582         break;
00583     case UEM_SVSEM:
00584         free_svsem_reg(e->ue_reg);
00585         break;
00586     case UEM_PXSEM:
00587         break;
00588     case UEM_FD_READ:
00589     case UEM_FD_WRITE:
00590     case UEM_FD_ERROR:
00591         break;
00592     case UEM_SIG:
00593         break;
00594     case UEM_PROCESS:
00595         break;
00596     case UEM_HEARTBEAT:
00597         break;
00598     case UEM_NONE:
00599         break;
00600     }
00601     free(e->ue_reg);
00602     printf("thread cancelled\n");
00603     return true;
00604 
00605 EC_CLEANUP_BGN
00606     return false;
00607 EC_CLEANUP_END
00608 }
00609 
00610 bool uem_bgn(void)
00611 {
00612     sigset_t set;
00613 
00614     ec_neg1( sigemptyset(&set) )
00615     ec_neg1( sigfillset(&set) )
00616     ec_neg1( sigdelset(&set, SIGINT) ) /* convenient for debugging */
00617     ec_rv( pthread_sigmask(SIG_SETMASK, &set, NULL) )
00618     return true;
00619 
00620 EC_CLEANUP_BGN
00621     return false;
00622 EC_CLEANUP_END
00623 }
00624 
00625 bool uem_end(void)
00626 {
00627     return true;
00628 }

Generated on Fri Apr 23 10:57:01 2004 for AUP2 Example Source by doxygen 1.3.1