source: svn/trunk/zas_dstar/hal/os/src/v2lin/lmsgQLib.c @ 2

Last change on this file since 2 was 2, checked in by phkim, 11 years ago

1.phkim

  1. revision copy newcon3sk r27
File size: 35.4 KB
Line 
1/****************************************************************************
2 *^Copyright (C) 2004, 2005, 2006 v2lin Team <http://v2lin.sf.net>
3 *^Copyright (C) 2000,2001  Monta Vista Software Inc.
4 *
5 * This file is part of the v2lin Library.
6 * VxWorks is a registered trademark of Wind River Systems, Inc.
7 *
8 * Initial implementation Gary S. Robertson, 2000, 2001.
9 * Contributed by Andrew Skiba, skibochka@sourceforge.net, 2004.
10 * Contributed by Mike Kemelmakher, mike@ubxess.com, 2005.
11 * Contributed by Constantine Shulyupin, conan.sh@gmail.com, 2006.
12 *
13 * The v2lin library is free software; you can redistribute it and/or
14 * modify it under the terms of the GNU Lesser General Public
15 * License as published by the Free Software Foundation; either
16 * version 2.1 of the License, or (at your option) any later version.
17 *
18 * The v2lin Library is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21 * Lesser General Public License for more details.
22 *
23 ****************************************************************************/
24
25#include <errno.h>
26#include <unistd.h>
27#include <stdlib.h>
28#include <stdio.h>
29#include <string.h>
30#include <signal.h>
31#include <sys/time.h>
32#include "v2lpthread.h"
33#include "vxw_hdrs.h"
34#include "vxw_defs.h"
35#include "internal.h"
36#include "v2ldebug.h"
37#include "os_prive.h"
38
39#define SEND  0
40#define URGNT 1
41#define KILLD 2
42
43typedef struct q_msg
44{
45        uint msglen;
46        char *msgbuf;
47} q_msg_t;
48
49/*****************************************************************************
50**  Control block for v2pthread queue
51**
52**  The message list for a queue is organized into an array called an extent.
53**  Actual send and fetch operations are done using a queue_head and
54**  queue_tail pointer.  These pointers must 'rotate' through the extent to
55**  create a logical circular buffer.  A single extra location is added
56**  to ensure room for urgent messages even when the queue is 'full' for
57**  normal messages.
58**
59*****************************************************************************/
60typedef struct v2pt_mqueue
61{
62        // Mutex and Condition variable for queue send/pend
63        pthread_mutex_t queue_lock;
64        pthread_cond_t queue_send;
65
66        // Mutex and Condition variable for queue delete
67        pthread_mutex_t qdlet_lock;
68        pthread_cond_t qdlet_cmplt;
69
70        // Mutex and Condition variable for queue-full pend
71        pthread_mutex_t qfull_lock;
72        pthread_cond_t queue_space;
73
74        //  Pointer to next message pointer to be fetched from queue
75        q_msg_t *queue_head;
76
77        // Pointer to last message pointer sent to queue
78        q_msg_t *queue_tail;
79
80        int send_type; // Type of send operation last performed on queue
81
82        q_msg_t *first_msg_in_queue;
83        q_msg_t *last_msg_in_queue;
84
85        struct v2pt_mqueue *nxt_queue;
86
87        // First task control block in list of tasks waiting to receive a message from queue
88        DS_TASK_T *first_susp;
89
90        // First task control block in list of tasks waiting for space to post messages to queue
91        DS_TASK_T *first_write_susp;
92
93        int msg_count; // Total number of messages currently sent to queue
94        int msgs_per_queue; // Total (max) messages per queue
95        uint msg_len; // Maximum size of messages sent to queue
96        size_t vmsg_len; // sizeof( each element in queue ) used for subscript incr/decr.
97        int order; // Task pend order (FIFO or Priority) for queue
98} v2pt_mqueue_t;
99
100static v2pt_mqueue_t *mqueue_list;
101static pthread_mutex_t mqueue_list_lock = PTHREAD_MUTEX_INITIALIZER;
102
103int msgQShow(v2pt_mqueue_t * q , FILE * out, int mem)
104{
105        DS_TASK_T *t; 
106        int i;
107       
108        fprintf(out,"%x num=%i len=%i: ", (int)q, (int)q->msgs_per_queue, (int)q->msg_len);
109        fprintf(out,"readers: ");
110        i=0;
111        for ( t = q->first_susp; t; t = t->nxt_susp)   {
112                fprintf(out,"%x %s ", (int)t, t->name);
113                i++;
114                if ( i > 10 )  break;
115        }
116        i=0;
117        fprintf(out,"writers: ");
118        for ( t = q->first_write_susp; t; t = t->nxt_susp)  {
119                fprintf(out,"%x %s ", (int)t, t->name);
120                i++;
121                if ( i > 10 )  break;
122        }
123        if (mem) {
124                int * w;
125                for ( w = (int*)q; w < (int*) (q+1);w++) 
126                        fprintf(out,"%x ",*w);
127        }
128        fprintf(out,"\n");
129        q=q->nxt_queue;
130        return 0;
131}
132
133int msgQList(FILE * out,int mem)
134{
135        TRACEF();
136        int c=0;
137        v2pt_mqueue_t * q = mqueue_list;
138        while (q){
139                c++;
140                fprintf(out,"%i ",c);
141                msgQShow(q,out,mem);
142                q=q->nxt_queue;
143        }
144        return c;
145}
146
147
148/*****************************************************************************
149**  mqueue_find_lock - verifies whether the specified queue still exists, and if
150**                so, locks exclusive access to the queue for the caller.
151*****************************************************************************/
152static int mqueue_find_lock(v2pt_mqueue_t * queue)
153{
154        v2pt_mqueue_t *current_qcb;
155        int found_queue;
156        found_queue = FALSE;
157        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, (void *) &mqueue_list_lock);
158        pthread_mutex_lock(&mqueue_list_lock);
159        for (current_qcb = mqueue_list;
160                        current_qcb != NULL; current_qcb = current_qcb->nxt_queue) {
161                if (current_qcb == queue) {
162                        /*
163                         ** Lock mutex for queue access (it is assumed that a
164                         ** 'pthread_cleanup_push()' has already been performed
165                         **  by the caller in case of unexpected thread termination.)
166                         */
167                        pthread_mutex_lock(&queue->queue_lock);
168                        found_queue = TRUE;
169                        break;
170                }
171        }
172        pthread_cleanup_pop(1);
173        return found_queue;
174}
175
176/*****************************************************************************
177** link_qcb - appends a new queue control block pointer to the mqueue_list
178*****************************************************************************/
179static void link_qcb(v2pt_mqueue_t * new_mqueue)
180{
181        v2pt_mqueue_t **i;
182
183        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, (void *) &mqueue_list_lock);
184        pthread_mutex_lock(&mqueue_list_lock);
185
186        i = & mqueue_list; 
187        while (*i) i = & (*i)->nxt_queue; // find tail
188        new_mqueue->nxt_queue = NULL;
189        *i = new_mqueue;
190        TRACEF("add queue cb @ %p ",new_mqueue);
191        pthread_mutex_unlock(&mqueue_list_lock);
192        pthread_cleanup_pop(0);
193}
194
195/*****************************************************************************
196** unlink_qcb - removes a queue control block pointer from the mqueue_list
197*****************************************************************************/
198static v2pt_mqueue_t *unlink_qcb(v2pt_mqueue_t * qid)
199{
200        v2pt_mqueue_t **i = &mqueue_list;
201        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, (void *) &mqueue_list_lock);
202        pthread_mutex_lock(&mqueue_list_lock);
203
204        while (*i) {
205                if ( *i == qid ) {
206                        TRACEF("%p", qid);
207                        *i = (*i)->nxt_queue;   // remove
208                        break;
209                }
210                i = &((*i)->nxt_queue);
211        }
212        pthread_cleanup_pop(1);
213        return qid;
214}
215
216/*****************************************************************************
217** urgent_msg_to - sends a message to the front of the specified queue
218*****************************************************************************/
219static void urgent_msg_to(v2pt_mqueue_t * queue, char *msg, uint msglen)
220{
221        uint i;
222        char *element;
223        TRACEF();
224        /*
225         **  It is assumed when we enter this function that the queue has space
226         **  to accept the message about to be sent.
227         **  Pre-decrement the queue_head (fetch) pointer, adjusting for
228         **  possible wrap to the end of the queue;
229         **  (Urgent messages are placed at the queue head so they will be the
230         **  next message fetched from the queue - ahead of any
231         **  previously-queued messages.)
232         */
233        element = (char *) queue->queue_head;
234        element -= queue->vmsg_len;
235        queue->queue_head = (q_msg_t *) element;
236
237        if (queue->queue_head < queue->first_msg_in_queue) {
238                /*
239                 **  New queue_head pointer underflowed beginning of the extent...
240                 **  Wrap the queue_head pointer to the last message address
241                 **  in the extent allocated for the queue.
242                 */
243                queue->queue_head = queue->last_msg_in_queue;
244        }
245        TRACEF(" new queue_head @ %p", queue->queue_head);
246
247        if (msg != (char *) NULL) {
248                element = (char *) &((queue->queue_head)->msgbuf);
249                for (i = 0; i < msglen; i++) {
250                        *(element + i) = *(msg + i);
251                }
252        }
253        (queue->queue_head)->msglen = msglen;
254
255        TRACEF("nsent urgent msg %p len %x to queue_head @ %p", msg, msglen, queue->queue_head);
256        queue->msg_count++;
257
258        queue->send_type = URGNT;
259}
260
261/*****************************************************************************
262** send_msg_to - sends the specified message to the tail of the specified queue
263*****************************************************************************/
264static void send_msg_to(v2pt_mqueue_t * queue, char *msg, uint msglen)
265{
266        uint i;
267        char *element;
268        TRACEF();
269        /*
270         **  It is assumed when we enter this function that the queue has space
271         **  to accept the message about to be sent.  Start by sending the
272         **  message.
273         */
274        if (msg != (char *) NULL) {
275                element = (char *) &((queue->queue_tail)->msgbuf);
276                for (i = 0; i < msglen; i++) {
277                        *(element + i) = *(msg + i);
278                }
279        }
280        queue->queue_tail->msglen = msglen;
281
282        TRACEF("%x len %x to queue_tail @ %p", msg, msglen, queue->queue_tail);
283
284        /*
285         **  Now increment the queue_tail (send) pointer, adjusting for
286         **  possible wrap to the beginning of the queue.
287         */
288        element = (char *) queue->queue_tail;
289        element += queue->vmsg_len;
290        queue->queue_tail = (q_msg_t *) element;
291
292        if (queue->queue_tail > queue->last_msg_in_queue) {
293                /*
294                 **  Wrap the queue_tail pointer to the first message address
295                 **  in the queue.
296                 */
297                queue->queue_tail = queue->first_msg_in_queue;
298        }
299        TRACEF(" new queue_tail @ %p", queue->queue_tail);
300
301        queue->msg_count++;
302        queue->send_type = SEND;
303       
304        if ( queue->first_susp )
305            pthread_cond_broadcast(&(queue->queue_send));
306}
307
308/*****************************************************************************
309** notify_if_delete_complete - indicates if all tasks waiting on specified
310**                             queue have successfully been awakened.
311*****************************************************************************/
312static void notify_if_delete_complete(v2pt_mqueue_t * queue)
313{
314        /*
315         **  All tasks pending on the specified queue are being awakened...
316         **  If the calling task was the last task pending on the queue,
317         **  signal the deletion-complete condition variable.
318         */
319        if (( ! queue->first_susp ) && ( ! queue->first_write_susp )) {
320                // Lock mutex for queue delete completion
321                pthread_mutex_clean_lock(&(queue->qdlet_lock));
322
323                // Signal the deletion-complete condition variable for the queue
324                pthread_cond_broadcast(&queue->qdlet_cmplt);
325
326                // Unlock the queue delete completion mutex.
327                pthread_mutex_unlock(&queue->qdlet_lock);
328                pthread_cleanup_pop(0);
329        }
330}
331
332
333/*****************************************************************************
334** fetch_msg_from - fetches the next message from the specified queue
335*****************************************************************************/
336static uint fetch_msg_from(v2pt_mqueue_t * queue, char *msg)
337{
338        char *element;
339        uint i;
340        uint msglen;
341
342        /*
343         **  It is assumed when we enter this function that the queue contains
344         **  one or more messages to be fetched.
345         **  Fetch the message from the queue_head message location.
346         */
347        if (msg != (char *) NULL) {
348                element = (char *) &((queue->queue_head)->msgbuf);
349                msglen = (queue->queue_head)->msglen;
350                for (i = 0; i < msglen; i++) {
351                        *(msg + i) = *(element + i);
352                }
353        } else
354                msglen = 0;
355
356        TRACEF("fetched msg of len %x from queue_head @ %p", msglen, queue->queue_head);
357
358        /*
359         **  Clear the message from the queue
360         */
361        element = (char *) &((queue->queue_head)->msgbuf);
362        *element = (char) 0;
363        (queue->queue_head)->msglen = 0;
364
365        /*
366         **  Now increment the queue_head (send) pointer, adjusting for
367         **  possible wrap to the beginning of the queue.
368         */
369        element = (char *) queue->queue_head;
370        element += queue->vmsg_len;
371        queue->queue_head = (q_msg_t *) element;
372
373        if (queue->queue_head > queue->last_msg_in_queue) {
374                /*
375                 **  New queue_head pointer overflowed end of queue...
376                 **  Wrap the queue_head pointer to the first message address
377                 **  in the queue.
378                 */
379                queue->queue_head = queue->first_msg_in_queue;
380        }
381        TRACEF(" new queue_head @ %p", queue->queue_head);
382
383        queue->msg_count--;
384
385        /*
386         **  Now see if adequate space was freed in the queue and alert any tasks
387         **  waiting for message space if adequate space now exists.
388         */
389        if (queue->first_write_susp != (DS_TASK_T *) NULL) {
390                if (queue->msg_count <= (queue->msgs_per_queue - 1)) {
391
392                        TRACEF("\r\nqueue @ %p freed msg space for queue list @ %p",
393                                   queue, &(queue->first_write_susp));
394                        /*
395                         **  Lock mutex for queue space
396                         */
397                        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
398                                                                 (void *) &(queue->qfull_lock));
399                        pthread_mutex_lock(&(queue->qfull_lock));
400
401                        /*
402                         **  Alert the waiting tasks that message space is available.
403                         */
404                        pthread_cond_broadcast(&(queue->queue_space));
405
406                        /*
407                         **  Unlock the queue space mutex.
408                         */
409                        pthread_cleanup_pop(1);
410                }
411        }
412        return (msglen);
413}
414
415/*****************************************************************************
416** data_extent_for - allocates space for queue data.  Data is allocated in
417**                  a block large enough to hold (max_msgs + 1) messages.
418*****************************************************************************/
419static q_msg_t *data_extent_for(v2pt_mqueue_t * queue)
420{
421        char *new_extent;
422        char *last_msg;
423        size_t alloc_size;
424
425        /*
426         **  Calculate the number of bytes of memory needed for this extent.
427         **  Start by calculating the size of each element of the extent array.
428         **  Each (q_msg_t) element will contain an unsigned int byte length followed
429         **  by a character array of queue->msg_len bytes.  First get the size
430         **  of the q_msg_t 'header' excluding the start of the data array.
431         **  Then add the size of the maximum-length message data.
432         */
433        queue->vmsg_len = sizeof(q_msg_t) - sizeof(char *);
434        queue->vmsg_len += (sizeof(char) * queue->msg_len);
435
436        /*
437         **  The size of each array element is now known...
438         **  Multiply it by the number of elements to get allocation size.
439         */
440        alloc_size = queue->vmsg_len * (queue->msgs_per_queue + 1);
441
442        /*
443         **  Now allocate a block of memory to contain the extent.
444         */
445        if ((new_extent = (char *) malloc(alloc_size)) != (char *) NULL) {
446                /*
447                 **  Clear the memory block.  Note that this creates a NULL pointer
448                 **  for the nxt_extent link as well as zeroing the message array.
449                 */
450                bzero((void *) new_extent, (int) alloc_size);
451
452                /*
453                 **  Link new data extent into the queue control block
454                 */
455                last_msg = new_extent + (queue->vmsg_len * queue->msgs_per_queue);
456                queue->first_msg_in_queue = (q_msg_t *) new_extent;
457                queue->last_msg_in_queue = (q_msg_t *) last_msg;
458        }
459        TRACEF("new extent @ %p for queue @ %p vmsg_len %x", new_extent, queue, queue->vmsg_len);
460        return ((q_msg_t *) new_extent);
461}
462
463/*****************************************************************************
464** msgQCreate - creates a v2pthread message queue
465*****************************************************************************/
466MSG_Q_ID msgQCreate(int max_msgs, int msglen, int opt)
467{
468        v2pt_mqueue_t *queue;
469        STATUS error;
470        TRACEF("%i %i %x",max_msgs,msglen,opt);
471        error = OK;
472        queue = (v2pt_mqueue_t *) malloc(sizeof(v2pt_mqueue_t));
473        if (!queue) {
474
475                error = S_memLib_NOT_ENOUGH_MEMORY;
476                goto exit;
477        }
478        memset(queue,0,sizeof(*queue));
479        queue->msgs_per_queue = max_msgs;
480
481        queue->msg_len = msglen;
482
483        if ( data_extent_for(queue) ) {
484
485                /*
486                 ** Mutex and Condition variable for queue send/pend
487                 */
488                pthread_mutex_init(&(queue->queue_lock), (pthread_mutexattr_t *) NULL);
489                pthread_cond_init(&(queue->queue_send), (pthread_condattr_t *) NULL);
490
491                /*
492                 ** Mutex and Condition variable for queue delete
493                 */
494                pthread_mutex_init(&(queue->qdlet_lock), (pthread_mutexattr_t *) NULL);
495                pthread_cond_init(&(queue->qdlet_cmplt), (pthread_condattr_t *) NULL);
496
497                /*
498                 ** Mutex and Condition variable for queue-full pend
499                 */
500                pthread_mutex_init(&(queue->qfull_lock), (pthread_mutexattr_t *) NULL);
501                pthread_cond_init(&(queue->queue_space), (pthread_condattr_t *) NULL);
502
503                /*
504                 ** Pointer to next message pointer to be fetched from queue
505                 */
506                queue->queue_head = queue->first_msg_in_queue;
507
508                /*
509                 ** Pointer to last message pointer sent to queue
510                 */
511                queue->queue_tail = queue->first_msg_in_queue;
512
513                /*
514                 ** Type of send operation last performed on queue
515                 */
516                queue->send_type = SEND;
517
518                queue->first_susp = NULL;
519
520                /*
521                 ** First task control block in list of tasks waiting for space to
522                 ** post messages to queue
523                 */
524                queue->first_write_susp = (DS_TASK_T *) NULL;
525
526                /*
527                 ** Total number of messages currently sent to queue
528                 */
529                queue->msg_count = 0;
530
531                /*
532                 ** Task pend order (FIFO or Priority) for queue
533                 */
534                if (opt & MSG_Q_PRIORITY)
535                        queue->order = 1;
536                else
537                        queue->order = 0;
538
539                /*
540                 **  If no errors thus far, we have a new queue ready to link into
541                 **  the queue list.
542                 */
543                if (error == OK) {
544                        link_qcb(queue);
545                } else {
546                        /*
547                         **  Oops!  Problem somewhere above.  Release control block
548                         **  and data memory and return.
549                         */
550                        free((void *) queue->first_msg_in_queue);
551                        free((void *) queue);
552                }
553        } else {
554                /*
555                 **  No memory for queue data... free queue control block & return
556                 */
557                free((void *) queue);
558                error = S_memLib_NOT_ENOUGH_MEMORY;
559        }
560exit:
561        if (error != OK) {
562                errno = (int) error;
563                queue = (v2pt_mqueue_t *) NULL;
564        }
565
566        return (MSG_Q_ID)queue;
567}
568
569/*****************************************************************************
570** waiting_on_q_space - returns a nonzero result unless a qualifying event
571**                      occurs on the specified queue which should cause the
572**                      pended task to be awakened.  The qualifying events
573**                      are:
574**                          1. message space is freed in the queue and the
575**                              current task is selected to receive it
576**                          2. the queue is deleted
577*****************************************************************************/
578static int waiting_on_q_space(v2pt_mqueue_t * queue, struct timespec *timeout, int *retcode)
579{
580        int result;
581        struct timeval now;
582        unsigned long usec;
583        TRACEF();
584        if (queue->send_type & KILLD) {
585                result = 0;
586                *retcode = 0;
587        } else {
588                /*
589                 **  Queue still in service... check for message space availability.
590                 **  Initially assume no message space available for our task
591                 */
592                result = 1;
593
594                /*
595                 **  Multiple messages removed from the queue may be represented by
596                 **  only a single signal to the condition variable, so continue
597                 **  checking for a message slot for our task as long as more space
598                 **  is available.  Also note that for a 'zero-length' queue, the
599                 **  presence of a task waiting on the queue for our message will
600                 **  allow our message to be posted to the queue.
601                 */
602                while ((queue->msg_count <= (queue->msgs_per_queue - 1)) ||
603                           ((queue->msgs_per_queue == 0) && queue->first_susp )) {
604                        // Message slot available... see if it's for our task.
605                        if (signal_for_my_task(&queue->first_write_susp, queue->order)) {
606                                /*
607                                 **  Message slot was destined for our task... waiting is over.
608                                 */
609                                result = 0;
610                                *retcode = 0;
611                                break;
612                        } else {
613                                /*
614                                 **  Message slot isn't for our task... continue waiting.
615                                 **  Sleep awhile to allow other tasks ahead of ours in the
616                                 **  list of tasks waiting on the queue to get their
617                                 **  messages, bringing our task to the head of the list.
618                                 */
619                                pthread_mutex_unlock(&(queue->qfull_lock));
620                                OS_Delay(1);
621                                pthread_mutex_lock(&(queue->qfull_lock));
622                        }
623
624                        /*
625                         **  If a timeout was specified, make sure we respect it and
626                         **  exit this loop if it expires.
627                         */
628                        if (timeout != (struct timespec *) NULL) {
629                                gettimeofday(&now, (struct timezone *) NULL);
630                                if (timeout->tv_nsec > (now.tv_usec * 1000)) {
631                                        usec = (timeout->tv_nsec - (now.tv_usec * 1000)) / 1000;
632                                        if (timeout->tv_sec < now.tv_sec)
633                                                usec = 0;
634                                        else
635                                                usec += ((timeout->tv_sec - now.tv_sec) * 1000000);
636                                } else {
637                                        usec = ((timeout->tv_nsec + 1000000000) - (now.tv_usec * 1000)) / 1000;
638                                        if ((timeout->tv_sec - 1) < now.tv_sec)
639                                                usec = 0;
640                                        else
641                                                usec += (((timeout->tv_sec - 1) - now.tv_sec)
642                                                                 * 1000000);
643                                }
644                                if (usec == 0)
645                                        break;
646                        }
647                }
648        }
649
650        return result;
651}
652
653/*****************************************************************************
654** waitToSend - sends the queue message if sufficient space becomes available
655**              within the allotted waiting interval.
656*****************************************************************************/
657STATUS waitToSend(v2pt_mqueue_t * queue, char *msg, uint msglen, int wait, int pri)
658{
659        DS_TASK_T *our_task;
660        struct timeval now;
661        struct timespec timeout;
662        int retcode;
663        long sec, usec;
664        STATUS error = OK;
665        TRACEF();
666
667        if (wait != NO_WAIT) {
668                //  Add task for task to list of tasks waiting on queue
669                our_task = taskFind(0, 1);
670                TRACEV("%x", queue->first_write_susp);
671
672                link_susp_task(&queue->first_write_susp, our_task);
673
674                retcode = 0;
675
676                //  Unlock the queue mutex so other tasks can receive messages.
677                pthread_mutex_unlock(&queue->queue_lock);
678
679                if (wait == WAIT_FOREVER) {
680                        while (waiting_on_q_space(queue, 0, &retcode)) {
681                                pthread_cond_wait(&queue->queue_space, &queue->qfull_lock);
682                        }
683                } else {
684                        /*
685                         **  Wait on queue message space with timeout...
686                         **  Calculate timeout delay in seconds and microseconds.
687                         */
688                        sec = 0;
689                        usec = wait * V2PT_TICK * 1000;
690                        gettimeofday(&now, (struct timezone *) NULL);
691                        usec += now.tv_usec;
692                        if (usec > 1000000) {
693                                sec = usec / 1000000;
694                                usec = usec % 1000000;
695                        }
696                        timeout.tv_sec = now.tv_sec + sec;
697                        timeout.tv_nsec = usec * 1000;
698
699                        /*
700                         **  Wait for queue message space for the current task or for the
701                         **  timeout to expire.  The loop is required since the task
702                         **  may be awakened by signals for messages which are
703                         **  not ours, or for signals other than from a message send.
704                         */
705                        while ((waiting_on_q_space(queue, &timeout, &retcode)) && (retcode != ETIMEDOUT)) {
706                                retcode = pthread_cond_timedwait(&queue->queue_space,
707                                                &queue->qfull_lock, &timeout);
708                        }
709                }
710
711                /*
712                 **  Re-lock the queue mutex before manipulating its control block.
713                 */
714                pthread_mutex_lock(&(queue->queue_lock));
715
716                /*
717                 **  Remove the calling task's task from the pended task list
718                 **  for the queue.  Clear our TCB's suspend list pointer in
719                 **  case the queue was killed & its ctrl blk deallocated.
720                 */
721                unlink_susp_task(&(queue->first_write_susp), our_task);
722                //our_task->suspend_list = NULL;
723
724                /*
725                 **  See if we were awakened due to a msgQDelete on the queue.
726                 */
727                if (queue->send_type & KILLD) {
728                        notify_if_delete_complete(queue);
729                        error = S_objLib_OBJ_DELETED;
730                        TRACEF("...queue deleted");
731                } else {
732                        /*
733                         **  See if we timed out or if we got a message slot
734                         */
735                        if (retcode == ETIMEDOUT) {
736                                /*
737                                 **  Timed out without obtaining a message slot
738                                 */
739                                error = S_objLib_OBJ_TIMEOUT;
740                                TRACEF("...timed out");
741                        } else {
742                                /*
743                                 **  A message slot was freed on the queue for this task...
744                                 */
745                                TRACEF("...rcvd queue msg space");
746
747                                if (pri == MSG_PRI_URGENT) {
748                                        /*
749                                         **  Stuff the new message onto the front of the queue.
750                                         */
751                                        urgent_msg_to(queue, msg, msglen);
752
753                                        /*
754                                         **  Signal the condition variable for the queue
755                                         */
756                                        pthread_cond_broadcast(&(queue->queue_send));
757                                } else
758                                        /*
759                                         **  Send the new message to the back of the queue.
760                                         */
761                                        send_msg_to(queue, msg, msglen);
762                        }
763                }
764        } else {
765                /*
766                 **  Queue is full and no waiting allowed... return QUEUE FULL error
767                 */
768                error = S_objLib_OBJ_UNAVAILABLE;
769                TRACEF("WARNING: queue is full");
770        }
771        return (error);
772}
773
774/*****************************************************************************
775** msgQSend - posts a message to the tail of a v2pthread queue and awakens the
776**            first selected task pended on the queue.
777*****************************************************************************/
778STATUS msgQSend(v2pt_mqueue_t * queue, char *msg, uint msglen, int wait, int pri)
779{
780        STATUS error = OK;
781        TRACEF("%x %x %x",my_task(),queue,queue->first_susp);
782
783        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &queue->queue_lock);
784        if (!mqueue_find_lock(queue)) {
785                error = S_objLib_OBJ_ID_ERROR;
786                goto exit;
787        }
788        if (msglen > queue->msg_len) {
789                error = S_msgQLib_INVALID_MSG_LENGTH;
790                goto unlock;
791        } 
792
793        if (queue->msg_count > queue->msgs_per_queue) { // BUG > ?
794                //  Queue is full
795                error = waitToSend(queue, msg, msglen, wait, pri);
796        } else {
797                if (queue->msg_count == queue->msgs_per_queue) {
798                        // Full
799                        if ((queue->msgs_per_queue == 0) && queue->first_susp ) {
800                                //  Special case... Send the new message.
801                                send_msg_to(queue, msg, msglen);
802                        } else {
803                                if (pri == MSG_PRI_URGENT) {
804                                        //  Stuff the new message onto the queue.
805                                        urgent_msg_to(queue, msg, msglen);
806                                        pthread_cond_broadcast(&(queue->queue_send));
807                                } else
808                                        /*
809                                         **  Queue is full... if waiting on space is
810                                         **  allowed, wait until space becomes available
811                                         **  or the timeout expires.  If space becomes
812                                         **  available, send the caller's message.
813                                         */
814                                        error = waitToSend(queue, msg, msglen, wait, pri);
815                        }
816                } else {
817                        if (pri == MSG_PRI_URGENT) {
818                                //  Stuff the new message onto the front of the queue.
819                                urgent_msg_to(queue, msg, msglen);
820
821                                //  Signal the condition variable for the queue
822                                pthread_cond_broadcast(&(queue->queue_send));
823                        } else
824                                //  Send the new message to the back of the queue.
825                                send_msg_to(queue, msg, msglen);
826                }
827        }
828unlock:
829        pthread_mutex_unlock(&queue->queue_lock);
830exit:   {}
831        pthread_cleanup_pop(0);
832
833#if 0
834        if (error != OK) {
835                errno = (int) error;
836                error = ERROR;
837        }
838#endif
839
840        return (error);
841}
842
843/*****************************************************************************
844** delete_mqueue - takes care of destroying the specified queue and freeing
845**                any resources allocated for that queue
846*****************************************************************************/
847static void delete_mqueue(v2pt_mqueue_t * queue)
848{
849        TRACEF();
850        unlink_qcb(queue);
851       
852        if (queue->first_msg_in_queue)
853            free(queue->first_msg_in_queue);
854       
855        if (queue)
856            free(queue);
857}
858
859//#define pthread_cond_broadcast(args) do { TRACEF("pthread_cond_broadcast %x",args);pthread_cond_broadcast(args); } while (0)
860
861/*****************************************************************************
862** msgQDelete - removes the specified queue from the queue list and frees
863**              the memory allocated for the queue control block and extents.
864*****************************************************************************/
865STATUS msgQDelete(v2pt_mqueue_t * queue)
866{
867        STATUS error = OK;
868        TRACEF();
869
870        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &queue->queue_lock);
871        if ( ! mqueue_find_lock(queue)) {
872                error = S_objLib_OBJ_ID_ERROR;
873                goto exit;
874        }
875        queue->send_type = KILLD;
876
877        //taskLock();
878        if ( queue->first_susp || queue->first_write_susp ) {
879            printf("!!! Certain thread is waiting for this queue %x\n", (int) queue );
880                pthread_mutex_clean_lock(&queue->qdlet_lock);
881
882                TRACEF("%x %x", &queue->queue_send, &queue->queue_lock);
883                // Signal the condition variable for tasks waiting on messages in the queue
884                pthread_cond_broadcast(&queue->queue_send);
885
886                // Unlock the queue send mutex.
887                pthread_mutex_unlock(&queue->queue_lock);
888
889                // Lock mutex for queue space
890                pthread_mutex_clean_lock(&queue->qfull_lock);
891
892                // Signal the condition variable for tasks waiting on space to post messages into the queue
893                pthread_cond_broadcast(&queue->queue_space);
894
895                // Unlock the queue space mutex.
896                pthread_cleanup_pop(1); // queue->qfull_lock
897
898                /*
899                 **  Wait for all pended tasks to receive deletion signal.
900                 **  The last task to receive the deletion signal will signal the
901                 **  deletion-complete condition variable.
902                 */
903                // while ( queue->first_susp  && queue->first_write_susp ) { BUG
904                while ( queue->first_susp  || queue->first_write_susp ) {
905                        pthread_cond_wait(&queue->qdlet_cmplt, &queue->qdlet_lock);
906                }
907
908                // Unlock the queue delete completion mutex.
909                pthread_cleanup_pop(1);
910        } else {
911                // Unlock the queue mutex.
912                pthread_mutex_unlock(&(queue->queue_lock));
913        }
914
915        /*
916         **  No other tasks are pending on the queue by this point...
917         **  Now physically delete the queue.
918         */
919        delete_mqueue(queue);
920        //taskUnlock();
921exit:
922        {}
923        pthread_cleanup_pop(0);
924
925        if (error != OK) {
926                errno = (int) error;
927                error = ERROR;
928        }
929
930        return error;
931}
932
933/*****************************************************************************
934** waiting_on_q_msg - returns a nonzero result unless a qualifying event
935**                    occurs on the specified queue which should cause the
936**                    pended task to be awakened.  The qualifying events
937**                    are:
938**                        1. a message is sent to the queue and the current
939**                            task is selected to receive it
940**                        2. the queue is deleted
941*****************************************************************************/
942static int waiting_on_q_msg(v2pt_mqueue_t * queue, struct timespec *timeout, int *retcode)
943{
944        int result;
945        struct timeval now;
946        unsigned long usec;
947        TRACEF();
948        if (queue->send_type & KILLD) {
949                // Queue has been killed... waiting is over.
950                TRACEF("KILLED");
951                result = 0;
952                *retcode = 0;
953        } else {
954                result = 1;
955
956                /*
957                 **  Multiple messages sent to the queue may be represented by only
958                 **  a single signal to the condition variable, so continue
959                 **  checking for a message for our task as long as more messages
960                 **  are available.
961                 */
962                while (queue->msg_count > 0) {
963                        TRACEF("%i",queue->msg_count);
964                        // Message arrived... see if it's for our task.
965                        if (signal_for_my_task(&queue->first_susp, queue->order)) {
966                                /*
967                                 **  Message was  destined for our task... waiting is over.
968                                 */
969                                result = 0;
970                                *retcode = 0;
971                                break;
972                        } else {
973                                /*
974                                 **  Message isn't for our task... continue waiting.
975                                 **  Sleep awhile to allow other tasks ahead of ours in the
976                                 **  list of tasks waiting on the queue to get their
977                                 **  messages, bringing our task to the head of the list.
978                                 */
979                                pthread_mutex_unlock(&queue->queue_lock);
980                                OS_Delay(1);
981                                pthread_mutex_lock(&queue->queue_lock);
982                        }
983
984                        /*
985                         **  If a timeout was specified, make sure we respect it and
986                         **  exit this loop if it expires.
987                         */
988                        if (timeout != (struct timespec *) NULL) {
989                                gettimeofday(&now, (struct timezone *) NULL);
990                                if (timeout->tv_nsec > (now.tv_usec * 1000)) {
991                                        usec = (timeout->tv_nsec - (now.tv_usec * 1000)) / 1000;
992                                        if (timeout->tv_sec < now.tv_sec)
993                                                usec = 0;
994                                        else
995                                                usec += ((timeout->tv_sec - now.tv_sec) * 1000000);
996                                } else {
997                                        usec = ((timeout->tv_nsec + 1000000000) - (now.tv_usec * 1000)) / 1000;
998                                        if ((timeout->tv_sec - 1) < now.tv_sec)
999                                                usec = 0;
1000                                        else
1001                                                usec += (((timeout->tv_sec - 1) - now.tv_sec)
1002                                                                 * 1000000);
1003                                }
1004                                if (usec == 0)
1005                                        break;
1006                        }
1007                }
1008        }
1009
1010        return result;
1011}
1012
1013/*****************************************************************************
1014** msgQReceive - blocks the calling task until a message is available in the
1015**               specified v2pthread queue.
1016*****************************************************************************/
1017int msgQReceive(v2pt_mqueue_t * queue, char *msgbuf, uint buflen, int max_wait)
1018{
1019        DS_TASK_T *our_task;
1020        struct timeval now;
1021        struct timespec timeout;
1022        int retcode;
1023        int msglen = ERROR;
1024        long sec, usec;
1025        STATUS error  = OK;
1026
1027        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &queue->queue_lock);
1028        if (!mqueue_find_lock(queue)) 
1029        {
1030                TRACEF("S_objLib_OBJ_ID_ERROR");
1031                error = S_objLib_OBJ_ID_ERROR;  /* Invalid queue specified */
1032                goto exit;
1033        }
1034
1035        if (buflen < queue->msg_len) {
1036                TRACEF("S_msgQLib_INVALID_MSG_LENGTH %i %i ", buflen, queue->msg_len);
1037                error = S_msgQLib_INVALID_MSG_LENGTH;
1038                goto unlock;
1039        } 
1040        // Add task for task to list of tasks waiting on queue
1041        our_task = taskFind(0, 1);
1042        TRACEF("%x %x", our_task, queue);
1043        TRACEF("wait on queue list @ %p", our_task, &queue->first_susp);
1044
1045        link_susp_task(&queue->first_susp, our_task);
1046        //  If tasks waiting to write to a zero-length queue, notify
1047        //  waiting task that we're ready to receive a message.
1048        if ( !queue->msgs_per_queue && queue->first_write_susp ) {
1049                pthread_mutex_clean_lock(&queue->qfull_lock);
1050                TRACEF();
1051                //  Alert the waiting tasks that message space is available.
1052                pthread_cond_broadcast(&queue->queue_space);
1053                pthread_cleanup_pop(1);
1054        }
1055
1056        retcode = 0;
1057
1058        if (max_wait == NO_WAIT) {
1059                /*
1060                 **  Caller specified no wait on queue message...
1061                 **  Check the condition variable with an immediate timeout.
1062                 */
1063                gettimeofday(&now, NULL);
1064                timeout.tv_sec = now.tv_sec;
1065                timeout.tv_nsec = now.tv_usec * 1000;
1066                while ((waiting_on_q_msg(queue, &timeout, &retcode)) && (retcode != ETIMEDOUT)) {
1067                        retcode = pthread_cond_timedwait(&queue->queue_send,
1068                                        &queue->queue_lock, &timeout);
1069                }
1070        } else if (max_wait == WAIT_FOREVER) {
1071                //  Infinite wait was specified... wait without timeout.
1072                while (waiting_on_q_msg(queue, 0, &retcode)) {
1073                        pthread_cond_wait(&queue->queue_send, &queue->queue_lock);
1074                }
1075        } else {
1076                /*
1077                 **  Wait on queue message arrival with timeout...
1078                 **  Calculate timeout delay in seconds and microseconds.
1079                 */
1080                sec = 0;
1081                usec = max_wait * V2PT_TICK * 1000;
1082                gettimeofday(&now, (struct timezone *) NULL);
1083                usec += now.tv_usec;
1084                if (usec > 1000000) {
1085                        sec = usec / 1000000;
1086                        usec = usec % 1000000;
1087                }
1088                timeout.tv_sec = now.tv_sec + sec;
1089                timeout.tv_nsec = usec * 1000;
1090
1091                /*
1092                 **  Wait for a queue message for the current task or for the
1093                 **  timeout to expire.  The loop is required since the task
1094                 **  may be awakened by signals for messages which are
1095                 **  not ours, or for signals other than from a message send.
1096                 */
1097                while ((waiting_on_q_msg(queue, &timeout, &retcode)) && (retcode != ETIMEDOUT)) {
1098                        TRACEF("pthread_cond_timedwait { %x %x %i", &queue->queue_send, &queue->queue_lock,max_wait);
1099                        retcode = pthread_cond_timedwait(&queue->queue_send, &queue->queue_lock, &timeout);
1100                        TRACEF("pthread_cond_timedwait }");
1101                }
1102        }
1103
1104        /*
1105         **  Remove the calling task's task from the waiting task list
1106         **  for the queue.  Clear our TCB's suspend list pointer in
1107         **  case the queue was killed & its ctrl blk deallocated.
1108         */
1109        unlink_susp_task(&(queue->first_susp), our_task);
1110        //our_task->suspend_list = NULL;
1111
1112        /*
1113         **  See if we were awakened due to a msgQDelete on the queue.
1114         */
1115        if (queue->send_type & KILLD) {
1116                notify_if_delete_complete(queue);
1117                error = S_objLib_OBJ_DELETED;
1118                TRACEF("...queue deleted");
1119        } else if (retcode == ETIMEDOUT) {
1120                /*
1121                 **  Timed out without a message
1122                 */
1123                if (max_wait == NO_WAIT)
1124                        error = S_objLib_OBJ_UNAVAILABLE;
1125                else
1126                        error = S_objLib_OBJ_TIMEOUT;
1127                TRACEF("...timed out");
1128        } else {
1129                /*
1130                 **  A message was sent to the queue for this task...
1131                 **  Retrieve the message and clear the queue contents.
1132                 */
1133                msglen = (int) fetch_msg_from(queue, (char *) msgbuf);
1134                TRACEF("...rcvd queue msg @ %p", msgbuf);
1135        }
1136
1137        /*
1138         **  Unlock the mutex for the condition variable.
1139         */
1140unlock:
1141        pthread_mutex_unlock(&queue->queue_lock);
1142exit: {}
1143        /*
1144         **  Clean up the opening pthread_cleanup_push()
1145         */
1146        pthread_cleanup_pop(0);
1147
1148        if (error != OK) {
1149                errno = error;
1150                msglen = ERROR;
1151        }
1152        return msglen;
1153}
1154
1155/*****************************************************************************
1156** msgQNumMsgs - returns the number of messages currently posted to the
1157**               specified queue.
1158*****************************************************************************/
1159int msgQNumMsgs(v2pt_mqueue_t * queue)
1160{
1161        int num_msgs;
1162        // copuld be just return queue->msg_count
1163        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, (void *) &(queue->queue_lock));
1164        if (mqueue_find_lock(queue)) {
1165                num_msgs = queue->msg_count;
1166                pthread_mutex_unlock(&(queue->queue_lock));
1167        } else {
1168                num_msgs = (int) ERROR;
1169        }
1170
1171        pthread_cleanup_pop(0);
1172
1173        return (num_msgs);
1174}
1175
Note: See TracBrowser for help on using the repository browser.