server/eventQueue.c

00001 /*
00002  * Copyright Ian Burnett 2005, 2006.
00003  *
00004  * This file is part of Ian's Interactive LCD controller (IILC).
00005  * 
00006  * IILC is free software; you can redistribute it and/or modify it under
00007  * the terms of the GNU General Public License as published by the Free
00008  * Software Foundation; either version 2 of the License, or (at your
00009  * option) any later version.
00010  *
00011  * IILC is distributed in the hope that it will be useful, but WITHOUT
00012  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00013  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
00014  * for more details.
00015  *
00016  * You should have received a copy of the GNU General Public License along
00017  * with IILC; if not, write to the Free Software Foundation, Inc.,
00018  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA.
00019  */
00020 
00021 #include <errno.h>
00022 #include <string.h>
00023 
00024 #if defined(_WIN32)
00025 #   include "pthread_w32.h"
00026 #   include "time_w32.h"
00027 #else
00028 #   include <sys/time.h>
00029 #endif
00030 
00031 #include "eventQueue.h"
00032 
00033 
00034 typedef struct _tag_event_queue_entry * LPEVENT_QUEUE_ENTRY;
00035 
00036 typedef struct _tag_event_queue_entry
00037 {
00038     /* Queue data */
00039     EVENT eventData;
00040 
00041     /* Previous event in this doubly-linked list */
00042     LPEVENT_QUEUE_ENTRY pPrevEvent;
00043 
00044     /* Next event in this doubly-linked list */
00045     LPEVENT_QUEUE_ENTRY pNextEvent;
00046 
00047     /* Time at which this event should dequeue */
00048     struct timeval dequeueTime;
00049 
00050 } EVENT_QUEUE_ENTRY;
00051 
00052 
00053 typedef struct _tag_event_queue
00054 {
00055     /* Queue mutex */
00056     pthread_mutex_t mtxQueue;
00057 
00058     /* Item enqueued sempahore */
00059     pthread_cond_t semAvailable;
00060 
00061     /* Front item in the queue */
00062     LPEVENT_QUEUE_ENTRY pFirst;
00063 
00064     /* Last item in the queue */
00065     LPEVENT_QUEUE_ENTRY pLast;
00066 
00067 } EVENT_QUEUE;
00068 
00069 
00070 
00071 int eventQueueCreate(LPEVENT_QUEUE * ppEventQueue)
00072 {
00073     int rc = 0;
00074 
00075     LPEVENT_QUEUE pEventQueue = NULL;
00076 
00077     pEventQueue = malloc(sizeof(EVENT_QUEUE));
00078 
00079     /* Create the mutex */
00080     rc = pthread_mutex_init(&(pEventQueue->mtxQueue), NULL);
00081     if (rc) {
00082         printf("Failed to create queue mutex: %s\n", strerror(errno));
00083         goto end_of_function;
00084     }
00085     
00086     /* Create the semaphore */
00087     rc = pthread_cond_init(&(pEventQueue->semAvailable), NULL);
00088     if (rc) {
00089         printf("Failed to create queue semaphore: %s\n", strerror(errno));
00090         goto end_of_function;
00091     }
00092 
00093     
00094     *ppEventQueue = pEventQueue;
00095 
00096 end_of_function:
00097 
00098     return 0;
00099 }
00100 
00101 int eventQueueDispose(LPEVENT_QUEUE * ppEventQueue)
00102 {
00103     LPEVENT_QUEUE pEventQueue = NULL;
00104     LPEVENT_QUEUE_ENTRY pCurrEntry = NULL;
00105 
00106     pEventQueue = *ppEventQueue;
00107 
00108     /* Lock the queue */
00109     pthread_mutex_lock(&(pEventQueue->mtxQueue));
00110 
00111     /* Walk across deleting everything */
00112     pCurrEntry = pEventQueue->pFirst;
00113     while (pCurrEntry != NULL) {
00114         free(pCurrEntry);
00115     }
00116 
00117     /* Clear the front & back */
00118     pEventQueue->pFirst = NULL;
00119     pEventQueue->pLast = NULL;
00120 
00121     /* Release the lock */
00122     pthread_mutex_unlock(&(pEventQueue->mtxQueue));
00123 
00124     pthread_cond_destroy(&(pEventQueue->semAvailable));
00125 
00126     pthread_mutex_destroy(&(pEventQueue->mtxQueue));
00127 
00128     free(pEventQueue);
00129 
00130     *ppEventQueue = NULL;
00131 
00132     return 0;
00133 }
00134 
00135 
00136 int eventQueueEnqueue(LPEVENT_QUEUE pEventQueue, LPEVENT pEvent, unsigned int fireInMillis)
00137 {
00138     LPEVENT_QUEUE_ENTRY pNew = NULL;
00139     LPEVENT_QUEUE_ENTRY pCurr = NULL;
00140     //LPEVENT_QUEUE_ENTRY pNext = NULL;
00141     
00142     /* Create the entry (OUTSIDE the mutex lock) */
00143     pNew = malloc(sizeof(EVENT_QUEUE_ENTRY));
00144     pNew->pNextEvent = NULL;
00145     pNew->pPrevEvent = NULL;
00146     memcpy(&(pNew->eventData), pEvent, sizeof(EVENT));
00147 
00148     /* Set the time when this is to dequeue */
00149     gettimeofday(&(pNew->dequeueTime), NULL);
00150     
00151     /* Increment seconds */
00152     pNew->dequeueTime.tv_sec += fireInMillis / 1000;
00153 
00154     /* Increment milliseconds */
00155     pNew->dequeueTime.tv_usec += (fireInMillis % 1000) * 1000;
00156 
00157     /* Rationalise */
00158     if (pNew->dequeueTime.tv_usec > 1000000) {
00159         pNew->dequeueTime.tv_usec -= 1000000;
00160         pNew->dequeueTime.tv_sec  += 1;
00161     }
00162 
00163     /* Lock the queue mutex */
00164     pthread_mutex_lock(&(pEventQueue->mtxQueue));
00165 
00166     /* Get the item at the back of the queue */
00167     pCurr = pEventQueue->pLast;
00168     
00169     /* Now work forwards until we find one with the same or higher priority */
00170     while (pCurr != NULL) {
00171 
00172         struct timeval currDequeue = pCurr->dequeueTime;
00173 
00174         /* Decide if this to dequeue first */
00175         if ((currDequeue.tv_sec < pNew->dequeueTime.tv_sec) ||
00176             (currDequeue.tv_sec == pNew->dequeueTime.tv_sec && 
00177              currDequeue.tv_usec < pNew->dequeueTime.tv_usec)) {
00178 
00179             /* pCurr should dequeue before us - fall in line here */
00180             break;
00181         }
00182 
00183         /* Move forward in the queue */
00184         pCurr = pCurr->pPrevEvent;
00185     };
00186 
00187     /* Point the new item to the current one */
00188     pNew->pPrevEvent = pCurr;
00189 
00190     /* Are we at the front? */
00191     if (pCurr == NULL) {
00192 
00193         /* Point the new item to the original front item */
00194         pNew->pNextEvent = pEventQueue->pFirst;
00195 
00196         /* Point the queue front to new item */
00197         pEventQueue->pFirst = pNew;
00198     }
00199     else {
00200         /* Point the new one to the next one */
00201         pNew->pNextEvent = pCurr->pNextEvent;
00202 
00203         /* Point the current item to the new one */
00204         pCurr->pNextEvent = pNew;
00205     }
00206 
00207     /* Fix up the back of queue or the next queue item */
00208     if (pNew->pNextEvent == NULL) {
00209         pEventQueue->pLast = pNew; /* At back of queue */
00210     }
00211     else {
00212         pNew->pNextEvent->pPrevEvent = pNew;
00213     }
00214 
00215     /* Signal something in the queue has changed */
00216     pthread_cond_signal(&(pEventQueue->semAvailable));    
00217 
00218     /* Release the queue lock */
00219     pthread_mutex_unlock(&(pEventQueue->mtxQueue));
00220 
00221     return 0;
00222 }
00223 
00224 
00231 int eventQueueDequeue(LPEVENT_QUEUE pEventQueue, LPEVENT pEvent)
00232 {
00233     int bGotLock = 0;
00234     LPEVENT_QUEUE_ENTRY pEntry = NULL;
00235     struct timeval dequeueTime;
00236     struct timeval currentTime;
00237     
00238     /* Lock the queue mutex */
00239     pthread_mutex_lock(&(pEventQueue->mtxQueue));
00240     bGotLock = 1;
00241 
00242     /* Wait until something is available at front of queue */
00243     while (pEventQueue->pFirst == NULL) {
00244         pthread_cond_wait(&(pEventQueue->semAvailable), &(pEventQueue->mtxQueue));
00245     }
00246 
00247     /* Now something at front of queue and we have lock */
00248     pEntry = pEventQueue->pFirst;
00249 
00250     /* Get the current time */
00251     gettimeofday(&currentTime, NULL);
00252 
00253     /* When do we dequeue this? */
00254     dequeueTime = pEntry->dequeueTime;
00255 
00256     /* Can we dequeue yet? */
00257     if ((dequeueTime.tv_sec > currentTime.tv_sec) ||
00258         (dequeueTime.tv_sec == currentTime.tv_sec && dequeueTime.tv_usec > currentTime.tv_usec))  {
00259 
00260         struct timespec timeout;
00261 
00262         /* Can't dequeue yet.
00263          * We need to wait until the required time has elapsed or
00264          * something new joins the queue with a more urgent need.
00265          */
00266 
00267         timeout.tv_sec  = dequeueTime.tv_sec;
00268         timeout.tv_nsec = dequeueTime.tv_usec * 1000;
00269 
00270         /* Perform the wait */
00271         pthread_cond_timedwait(&(pEventQueue->semAvailable), &(pEventQueue->mtxQueue), &timeout);
00272         
00273         /* Now either we've timed-out, or something more urgent arrived.
00274          * In any case, dequeue immediately and return to user
00275          */
00276     }
00277     else {
00278         /* OK to dequeue immediately */
00279     }
00280 
00281     /* Grab the fired event (don't forgot front of queue may have changed) */
00282     pEntry = pEventQueue->pFirst;
00283     memcpy(pEvent, &(pEntry->eventData), sizeof(EVENT));
00284 
00285     /* Fix up the front of queue */
00286     pEventQueue->pFirst = pEntry->pNextEvent;
00287     
00288     /* Fix up the first item */
00289     if (pEventQueue->pFirst != NULL) {
00290         pEventQueue->pFirst->pPrevEvent = NULL;
00291     }
00292     else {
00293         /* Nothing left in the queue - fix up back */
00294         pEventQueue->pLast = NULL;
00295     }
00296 
00297     /* Release the queue lock */
00298     pthread_mutex_unlock(&(pEventQueue->mtxQueue));
00299 
00300     /* Release memory for this entry */
00301     free(pEntry);
00302 
00303 
00304     return 0;
00305 }
00306 
00307 

Generated on Mon Jul 17 01:36:11 2006 for IILC by  doxygen 1.4.6