00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <errno.h>
00022 #include <string.h>
00023
00024 #if defined(_WIN32)
00025 # include "pthread_w32.h"
00026 # include "time_w32.h"
00027 #else
00028 # include <sys/time.h>
00029 #endif
00030
00031 #include "eventQueue.h"
00032
00033
00034 typedef struct _tag_event_queue_entry * LPEVENT_QUEUE_ENTRY;
00035
00036 typedef struct _tag_event_queue_entry
00037 {
00038
00039 EVENT eventData;
00040
00041
00042 LPEVENT_QUEUE_ENTRY pPrevEvent;
00043
00044
00045 LPEVENT_QUEUE_ENTRY pNextEvent;
00046
00047
00048 struct timeval dequeueTime;
00049
00050 } EVENT_QUEUE_ENTRY;
00051
00052
00053 typedef struct _tag_event_queue
00054 {
00055
00056 pthread_mutex_t mtxQueue;
00057
00058
00059 pthread_cond_t semAvailable;
00060
00061
00062 LPEVENT_QUEUE_ENTRY pFirst;
00063
00064
00065 LPEVENT_QUEUE_ENTRY pLast;
00066
00067 } EVENT_QUEUE;
00068
00069
00070
00071 int eventQueueCreate(LPEVENT_QUEUE * ppEventQueue)
00072 {
00073 int rc = 0;
00074
00075 LPEVENT_QUEUE pEventQueue = NULL;
00076
00077 pEventQueue = malloc(sizeof(EVENT_QUEUE));
00078
00079
00080 rc = pthread_mutex_init(&(pEventQueue->mtxQueue), NULL);
00081 if (rc) {
00082 printf("Failed to create queue mutex: %s\n", strerror(errno));
00083 goto end_of_function;
00084 }
00085
00086
00087 rc = pthread_cond_init(&(pEventQueue->semAvailable), NULL);
00088 if (rc) {
00089 printf("Failed to create queue semaphore: %s\n", strerror(errno));
00090 goto end_of_function;
00091 }
00092
00093
00094 *ppEventQueue = pEventQueue;
00095
00096 end_of_function:
00097
00098 return 0;
00099 }
00100
00101 int eventQueueDispose(LPEVENT_QUEUE * ppEventQueue)
00102 {
00103 LPEVENT_QUEUE pEventQueue = NULL;
00104 LPEVENT_QUEUE_ENTRY pCurrEntry = NULL;
00105
00106 pEventQueue = *ppEventQueue;
00107
00108
00109 pthread_mutex_lock(&(pEventQueue->mtxQueue));
00110
00111
00112 pCurrEntry = pEventQueue->pFirst;
00113 while (pCurrEntry != NULL) {
00114 free(pCurrEntry);
00115 }
00116
00117
00118 pEventQueue->pFirst = NULL;
00119 pEventQueue->pLast = NULL;
00120
00121
00122 pthread_mutex_unlock(&(pEventQueue->mtxQueue));
00123
00124 pthread_cond_destroy(&(pEventQueue->semAvailable));
00125
00126 pthread_mutex_destroy(&(pEventQueue->mtxQueue));
00127
00128 free(pEventQueue);
00129
00130 *ppEventQueue = NULL;
00131
00132 return 0;
00133 }
00134
00135
00136 int eventQueueEnqueue(LPEVENT_QUEUE pEventQueue, LPEVENT pEvent, unsigned int fireInMillis)
00137 {
00138 LPEVENT_QUEUE_ENTRY pNew = NULL;
00139 LPEVENT_QUEUE_ENTRY pCurr = NULL;
00140
00141
00142
00143 pNew = malloc(sizeof(EVENT_QUEUE_ENTRY));
00144 pNew->pNextEvent = NULL;
00145 pNew->pPrevEvent = NULL;
00146 memcpy(&(pNew->eventData), pEvent, sizeof(EVENT));
00147
00148
00149 gettimeofday(&(pNew->dequeueTime), NULL);
00150
00151
00152 pNew->dequeueTime.tv_sec += fireInMillis / 1000;
00153
00154
00155 pNew->dequeueTime.tv_usec += (fireInMillis % 1000) * 1000;
00156
00157
00158 if (pNew->dequeueTime.tv_usec > 1000000) {
00159 pNew->dequeueTime.tv_usec -= 1000000;
00160 pNew->dequeueTime.tv_sec += 1;
00161 }
00162
00163
00164 pthread_mutex_lock(&(pEventQueue->mtxQueue));
00165
00166
00167 pCurr = pEventQueue->pLast;
00168
00169
00170 while (pCurr != NULL) {
00171
00172 struct timeval currDequeue = pCurr->dequeueTime;
00173
00174
00175 if ((currDequeue.tv_sec < pNew->dequeueTime.tv_sec) ||
00176 (currDequeue.tv_sec == pNew->dequeueTime.tv_sec &&
00177 currDequeue.tv_usec < pNew->dequeueTime.tv_usec)) {
00178
00179
00180 break;
00181 }
00182
00183
00184 pCurr = pCurr->pPrevEvent;
00185 };
00186
00187
00188 pNew->pPrevEvent = pCurr;
00189
00190
00191 if (pCurr == NULL) {
00192
00193
00194 pNew->pNextEvent = pEventQueue->pFirst;
00195
00196
00197 pEventQueue->pFirst = pNew;
00198 }
00199 else {
00200
00201 pNew->pNextEvent = pCurr->pNextEvent;
00202
00203
00204 pCurr->pNextEvent = pNew;
00205 }
00206
00207
00208 if (pNew->pNextEvent == NULL) {
00209 pEventQueue->pLast = pNew;
00210 }
00211 else {
00212 pNew->pNextEvent->pPrevEvent = pNew;
00213 }
00214
00215
00216 pthread_cond_signal(&(pEventQueue->semAvailable));
00217
00218
00219 pthread_mutex_unlock(&(pEventQueue->mtxQueue));
00220
00221 return 0;
00222 }
00223
00224
00231 int eventQueueDequeue(LPEVENT_QUEUE pEventQueue, LPEVENT pEvent)
00232 {
00233 int bGotLock = 0;
00234 LPEVENT_QUEUE_ENTRY pEntry = NULL;
00235 struct timeval dequeueTime;
00236 struct timeval currentTime;
00237
00238
00239 pthread_mutex_lock(&(pEventQueue->mtxQueue));
00240 bGotLock = 1;
00241
00242
00243 while (pEventQueue->pFirst == NULL) {
00244 pthread_cond_wait(&(pEventQueue->semAvailable), &(pEventQueue->mtxQueue));
00245 }
00246
00247
00248 pEntry = pEventQueue->pFirst;
00249
00250
00251 gettimeofday(¤tTime, NULL);
00252
00253
00254 dequeueTime = pEntry->dequeueTime;
00255
00256
00257 if ((dequeueTime.tv_sec > currentTime.tv_sec) ||
00258 (dequeueTime.tv_sec == currentTime.tv_sec && dequeueTime.tv_usec > currentTime.tv_usec)) {
00259
00260 struct timespec timeout;
00261
00262
00263
00264
00265
00266
00267 timeout.tv_sec = dequeueTime.tv_sec;
00268 timeout.tv_nsec = dequeueTime.tv_usec * 1000;
00269
00270
00271 pthread_cond_timedwait(&(pEventQueue->semAvailable), &(pEventQueue->mtxQueue), &timeout);
00272
00273
00274
00275
00276 }
00277 else {
00278
00279 }
00280
00281
00282 pEntry = pEventQueue->pFirst;
00283 memcpy(pEvent, &(pEntry->eventData), sizeof(EVENT));
00284
00285
00286 pEventQueue->pFirst = pEntry->pNextEvent;
00287
00288
00289 if (pEventQueue->pFirst != NULL) {
00290 pEventQueue->pFirst->pPrevEvent = NULL;
00291 }
00292 else {
00293
00294 pEventQueue->pLast = NULL;
00295 }
00296
00297
00298 pthread_mutex_unlock(&(pEventQueue->mtxQueue));
00299
00300
00301 free(pEntry);
00302
00303
00304 return 0;
00305 }
00306
00307