source: svn/trunk/newcon3bcm2_21bu/nexus/modules/file/src/nexus_file_scheduller.c

Last change on this file was 2, checked in by jglee, 11 years ago

first commit

  • Property svn:executable set to *
File size: 25.8 KB
Line 
1/***************************************************************************
2 *     (c)2007-2011 Broadcom Corporation
3 *
4 *  This program is the proprietary software of Broadcom Corporation and/or its licensors,
5 *  and may only be used, duplicated, modified or distributed pursuant to the terms and
6 *  conditions of a separate, written license agreement executed between you and Broadcom
7 *  (an "Authorized License").  Except as set forth in an Authorized License, Broadcom grants
8 *  no license (express or implied), right to use, or waiver of any kind with respect to the
9 *  Software, and Broadcom expressly reserves all rights in and to the Software and all
10 *  intellectual property rights therein.  IF YOU HAVE NO AUTHORIZED LICENSE, THEN YOU
11 *  HAVE NO RIGHT TO USE THIS SOFTWARE IN ANY WAY, AND SHOULD IMMEDIATELY
12 *  NOTIFY BROADCOM AND DISCONTINUE ALL USE OF THE SOFTWARE.
13 *
14 *  Except as expressly set forth in the Authorized License,
15 *
16 *  1.     This program, including its structure, sequence and organization, constitutes the valuable trade
17 *  secrets of Broadcom, and you shall use all reasonable efforts to protect the confidentiality thereof,
18 *  and to use this information only in connection with your use of Broadcom integrated circuit products.
19 *
20 *  2.     TO THE MAXIMUM EXTENT PERMITTED BY LAW, THE SOFTWARE IS PROVIDED "AS IS"
21 *  AND WITH ALL FAULTS AND BROADCOM MAKES NO PROMISES, REPRESENTATIONS OR
22 *  WARRANTIES, EITHER EXPRESS, IMPLIED, STATUTORY, OR OTHERWISE, WITH RESPECT TO
23 *  THE SOFTWARE.  BROADCOM SPECIFICALLY DISCLAIMS ANY AND ALL IMPLIED WARRANTIES
24 *  OF TITLE, MERCHANTABILITY, NONINFRINGEMENT, FITNESS FOR A PARTICULAR PURPOSE,
25 *  LACK OF VIRUSES, ACCURACY OR COMPLETENESS, QUIET ENJOYMENT, QUIET POSSESSION
26 *  OR CORRESPONDENCE TO DESCRIPTION. YOU ASSUME THE ENTIRE RISK ARISING OUT OF
27 *  USE OR PERFORMANCE OF THE SOFTWARE.
28 *
29 *  3.     TO THE MAXIMUM EXTENT PERMITTED BY LAW, IN NO EVENT SHALL BROADCOM OR ITS
30 *  LICENSORS BE LIABLE FOR (i) CONSEQUENTIAL, INCIDENTAL, SPECIAL, INDIRECT, OR
31 *  EXEMPLARY DAMAGES WHATSOEVER ARISING OUT OF OR IN ANY WAY RELATING TO YOUR
32 *  USE OF OR INABILITY TO USE THE SOFTWARE EVEN IF BROADCOM HAS BEEN ADVISED OF
33 *  THE POSSIBILITY OF SUCH DAMAGES; OR (ii) ANY AMOUNT IN EXCESS OF THE AMOUNT
34 *  ACTUALLY PAID FOR THE SOFTWARE ITSELF OR U.S. $1, WHICHEVER IS GREATER. THESE
35 *  LIMITATIONS SHALL APPLY NOTWITHSTANDING ANY FAILURE OF ESSENTIAL PURPOSE OF
36 *  ANY LIMITED REMEDY.
37 *
38 * $brcm_Workfile: nexus_file_scheduller.c $
39 * $brcm_Revision: 15 $
40 * $brcm_Date: 5/13/11 4:13p $
41 *
42 * Module Description:
43 *
44 * Revision History:
45 *
46 * $brcm_Log: /nexus/modules/file/src/nexus_file_scheduller.c $
47 *
48 * 15   5/13/11 4:13p erickson
49 * SWDTV-6386: add "nx_" namespace prefix for internal nexus threads
50 *
51 * 14   4/20/11 7:49p vsilyaev
52 * SW7425-394: Allow application to set number of I/O worker threads that
53 *  are serving I/O requests
54 *
55 * 13   4/18/11 1:47p erickson
56 * SW7420-1123: fix 2.6.18 warning
57 *
58 * 12   4/6/11 6:53p vsilyaev
59 * SW7425-232: Removed MuxFileIo API to avoid naming clashes
60 *
61 * 11   4/6/11 4:53p vsilyaev
62 * SW7425-232: Added MuxFile interface
63 *
64 * 10   5/11/10 2:16p jtna
65 * SW7125-311: Coverity Defect ID:20334 CHECKED_RETURN
66 *
67 * 9   4/2/10 2:37p erickson
68 * SW7405-3833: nexus_file_pvr.h is now part of the public API
69 *
70 * 8   10/9/09 11:28a vsilyaev
71 * SW7405-3126: Continue write operatrion until entire block is written
72 *
73 * 7   1/20/09 10:52a erickson
74 * PR51175: fix NEXUS_FilePlay_Cancel. NEXUS_File_P_Item was getting
75 *  processed twice.
76 *
77 * 6   12/30/08 6:50p vsilyaev
78 * PR 50582: Fixed condition to spawn a timer
79 *
80 * 5   12/29/08 4:53p erickson
81 * PR50738: fix malloc size
82 *
83 * 4   12/11/08 2:05p erickson
84 * PR47854: clarify warning in NEXUS_FilePlay_Cancel
85 *
86 * 3   11/18/08 4:55p erickson
87 * PR47854: added NEXUS_FilePlay_Cancel to recover from hung threads due
88 *  to bad I/O
89 *
90 * 2   4/2/08 11:31a erickson
91 * PR40198: fix DEBUG=n warning
92 *
93 * 1   1/18/08 2:16p jgarrett
94 * PR 38808: Merging to main branch
95 *
96 * Nexus_Devel/4   11/7/07 5:34p vsilyaev
97 * PR 36788: Updated file API
98 *
99 * Nexus_Devel/3   10/15/07 5:10p vsilyaev
100 * PR 35824: Fixed queue operations
101 *
102 * Nexus_Devel/2   10/12/07 5:11p vsilyaev
103 * PR 35824: Added O_DIRECT flag, fixed lock/unlock order
104 *
105 * Nexus_Devel/1   10/10/07 3:57p vsilyaev
106 * PR 35824: File I/O module
107 *
108 * $copied_brcm_Log: /BSEAV/api/src/pvr/bsettop_fileio.c $
109 * $copied_brcm_Revision: 38 $
110 * $copied_brcm_Date: 9/24/07 3:52p $
111 **************************************************************************/
112#include "nexus_file_module.h"
113#include "bfile_io.h"
114#include "nexus_file_pvr.h"
115#include "nexus_file_muxio.h"
116
117BDBG_MODULE(nexus_file_scheduller);
118#define BDBG_MSG_TRACE(x) BDBG_MSG(x)
119
120struct NEXUS_File_P_Item {
121    BLST_S_ENTRY(NEXUS_File_P_Item) list;
122    enum {
123      ioType_Read,
124      ioType_Write,
125      ioType_MuxRead,
126      ioType_MuxWrite
127    } ioType;
128    union {
129        struct {
130            bfile_io_read_t fd;
131        } read;
132        struct {
133            bfile_io_write_t fd;
134        } write;
135        struct {
136            bfile_io_mux_t fd;
137            off_t offset;
138        } muxRead;
139        struct {
140            bfile_io_mux_t fd;
141            off_t offset;
142        } muxWrite;
143    } io;
144    void *buf; /* copy of buf */
145    size_t length;
146    void *cntx;
147    NEXUS_File_Callback callback;
148    NEXUS_ModuleHandle module;
149    ssize_t result;
150};
151
152BLST_S_HEAD(NEXUS_File_P_Queue, NEXUS_File_P_Item);
153
154struct NEXUS_File_P_IoWorker {
155    NEXUS_ThreadHandle thread;
156    unsigned no;
157    struct NEXUS_File_P_Item *current;
158};
159
160#define NEXUS_FILE_MAX_IOWORKERS  16
161struct NEXUS_File_P_Scheduller {
162    NEXUS_FileModuleSettings settings;
163    BKNI_EventHandle signal; /* signal to wakeup the thread */
164    unsigned kill_count; /* number of task to be killed */
165    unsigned workerThreads;
166    struct NEXUS_File_P_Queue free, queued, completed;
167    struct NEXUS_File_P_Item *items;
168    NEXUS_TimerHandle timer;
169    struct NEXUS_File_P_IoWorker *workers[NEXUS_FILE_MAX_IOWORKERS];/* worker threads to run */
170#define MAX_ZOMBIES 10
171    struct NEXUS_File_P_IoWorker *zombies[MAX_ZOMBIES];
172};
173
174static struct NEXUS_File_P_Scheduller g_NEXUS_File_P_Scheduller;
175
176#define pScheduler (&g_NEXUS_File_P_Scheduller)
177
178static void NEXUS_P_File_Scheduller_Thread(void *w);
179
180#if 0
181static void print_queues(void)
182{
183    struct NEXUS_File_P_Item *e;
184    unsigned count;
185
186    count = 0;
187    for (e= BLST_S_FIRST(&pScheduler->free);e; e =BLST_S_NEXT(e, list)) {
188        count++;
189    }
190    BKNI_Printf("free %d, ", count);
191
192    count = 0;
193    for (e= BLST_S_FIRST(&pScheduler->queued);e; e =BLST_S_NEXT(e, list)) {
194        count++;
195    }
196    BKNI_Printf("queue %d, ", count);
197
198    count = 0;
199    for (e= BLST_S_FIRST(&pScheduler->completed);e; e =BLST_S_NEXT(e, list)) {
200        count++;
201    }
202    BKNI_Printf("completed %d\n", count);
203}
204#endif
205
206void
207NEXUS_FilePlay_Cancel(NEXUS_FilePlayHandle file)
208{
209    unsigned i;
210    for (i=0;i<pScheduler->workerThreads;i++) {
211        struct NEXUS_File_P_IoWorker *worker = pScheduler->workers[i];
212        if (worker->current && worker->current->ioType == ioType_Read && worker->current->io.read.fd == file->file.data) {
213            char name[16];
214
215            BDBG_WRN(("Cancelling File worker %p", pScheduler->workers[i]));
216            /* this item's callback must be called with a failure code in order for other sw state to unwind. */
217
218            /* the new worker gets the item, but the item has failed and should be added to the completed queue now. */
219            pScheduler->workers[i] = BKNI_Malloc(sizeof(struct NEXUS_File_P_IoWorker));
220            pScheduler->workers[i]->no = worker->no;
221            pScheduler->workers[i]->current = NULL; /* don't give the item to the new worker. it will be serviced from NEXUS_P_File_TryCompleted. */
222            worker->current->result = -1;
223            BLST_S_INSERT_HEAD(&pScheduler->completed, worker->current, list);
224
225            /* the cancelled worker no longer owns the item */
226            worker->current = NULL;
227
228            BKNI_Snprintf(name, sizeof(name), "nx_io_worker%u", i);
229            pScheduler->workers[i]->thread = NEXUS_Thread_Create(name, NEXUS_P_File_Scheduller_Thread, pScheduler->workers[i], &pScheduler->settings.schedullerSettings);
230            if(!pScheduler->workers[i]->thread) {
231                BERR_TRACE(BERR_OUT_OF_SYSTEM_MEMORY);
232                /* no recovery */
233            }
234            /* only one can be active at a time */
235            return;
236        }
237    }
238    BDBG_WRN(("NEXUS_FilePlay_Cancel unable to find a pending NEXUS_FilePlayHandle %p. Are you sure I/O is hung on this file?", file));
239}
240
241static void NEXUS_File_P_CleanupZombies(void)
242{
243    int i;
244    for (i=0;i<MAX_ZOMBIES;i++) {
245        if (pScheduler->zombies[i]) {
246            BDBG_WRN(("Cleaned up zombie %p [%d]", pScheduler->zombies[i], i));
247            NEXUS_Thread_Destroy(pScheduler->zombies[i]->thread);
248            BKNI_Free(pScheduler->zombies[i]);
249            pScheduler->zombies[i] = NULL;
250        }
251    }
252}
253
254static void NEXUS_File_P_AddZombie(struct NEXUS_File_P_IoWorker *worker)
255{
256    int i;
257    for (i=0;i<MAX_ZOMBIES;i++) {
258        if (!pScheduler->zombies[i]) {
259            pScheduler->zombies[i] = worker;
260            return;
261        }
262    }
263    BDBG_ERR(("Unable to keep zombie. NEXUS_ThreadHandle leak."));
264}
265
266void
267NEXUS_File_AsyncRead(NEXUS_FileReadHandle fd, void *buf, size_t length, NEXUS_ModuleHandle module, NEXUS_File_Callback callback, void *cntx)
268{
269    struct NEXUS_File_P_Item *e;
270    BDBG_ASSERT(fd);
271    BDBG_ASSERT(buf);
272    BDBG_ASSERT(module);
273    BDBG_ASSERT(callback);
274
275    NEXUS_File_P_CleanupZombies();
276
277    e = BLST_S_FIRST(&pScheduler->free);
278    if(e) {
279        BLST_S_REMOVE_HEAD(&pScheduler->free, list);
280        e->ioType = ioType_Read;
281        e->io.read.fd = fd;
282        e->buf = buf;
283        e->length = length;
284        e->cntx = cntx;
285        e->module = module;
286        e->callback = callback;
287        e->result = -1;
288        BLST_S_INSERT_HEAD(&pScheduler->queued, e, list);
289        BDBG_MSG_TRACE(("rd: queue %#lx", (unsigned long)e));
290        BKNI_SetEvent(pScheduler->signal);
291        return;
292    } else {
293        BDBG_WRN(("NEXUS_File_AsyncRead: %#lx %#lx %u not enough descriptors", (unsigned long)cntx, (unsigned long)fd, (unsigned)length));
294        callback(cntx, -1);
295        return;
296    }
297}
298
299void
300NEXUS_File_AsyncWrite(NEXUS_FileWriteHandle fd, const void *buf, size_t length, NEXUS_ModuleHandle module, NEXUS_File_Callback callback, void *cntx)
301{
302    struct NEXUS_File_P_Item *e;
303    BDBG_ASSERT(fd);
304    BDBG_ASSERT(buf);
305    BDBG_ASSERT(module);
306    BDBG_ASSERT(callback);
307
308    NEXUS_File_P_CleanupZombies();
309
310    e = BLST_S_FIRST(&pScheduler->free);
311    if(e) {
312        BLST_S_REMOVE_HEAD(&pScheduler->free, list);
313        e->ioType = ioType_Write;
314        e->io.write.fd = fd;
315        e->buf = (void *)buf;
316        e->length = length;
317        e->cntx = cntx;
318        e->module = module;
319        e->callback = callback;
320        e->result = -1;
321        BLST_S_INSERT_HEAD(&pScheduler->queued, e, list);
322        BDBG_MSG_TRACE(("wr: queue %#lx", (unsigned long)e));
323        BKNI_SetEvent(pScheduler->signal);
324        return;
325    } else {
326        BDBG_WRN(("NEXUS_File_AsyncWrite: %#lx %#lx %u not enough descriptors", (unsigned long)cntx, (unsigned long)fd, (unsigned)length));
327        callback(cntx, -1);
328        return;
329    }
330    return;
331}
332
333/* calls all completed callbacks */
334static void
335NEXUS_P_File_TryCompleted(struct NEXUS_File_P_Scheduller *sched)
336{
337    if(BLST_S_FIRST(&pScheduler->completed)) {
338        struct NEXUS_File_P_Item *e;
339        struct NEXUS_File_P_Queue completed = sched->completed; /* save copy of completed queue */
340        BLST_S_INIT(&sched->completed); /* clear completed queue */
341        while( NULL!=(e=BLST_S_FIRST(&completed))) {
342            struct NEXUS_File_P_Queue *queue;
343            bool locked;
344
345            BLST_S_REMOVE_HEAD(&completed, list);
346            NEXUS_UnlockModule();
347            locked = NEXUS_Module_TryLock(e->module);
348            queue = &sched->completed;
349            if(locked) {
350                queue = &sched->free;
351                /* call callback and add into the free list */
352                e->callback(e->cntx, e->result);
353                NEXUS_Module_Unlock(e->module);
354            }
355            NEXUS_LockModule();
356            BLST_S_INSERT_HEAD(queue, e, list);
357        }
358    }
359    return;
360}
361
362static void
363NEXUS_P_File_CallCompleted(void *s)
364{
365    struct NEXUS_File_P_Scheduller *sched = s;
366    BDBG_MSG_TRACE(("NEXUS_P_File_CallCompleted: %#lx", (unsigned long)s));
367    sched->timer = NULL;
368    NEXUS_P_File_TryCompleted(sched);
369    if(BLST_S_FIRST(&pScheduler->completed)) {
370        sched->timer = NEXUS_ScheduleTimer(10, NEXUS_P_File_CallCompleted, sched);
371    }
372    return;
373}
374
375
376static NEXUS_Error
377NEXUS_P_File_FindAndDeque(struct NEXUS_File_P_Scheduller *sched, struct NEXUS_File_P_IoWorker *worker)
378{
379    int best_priority = 0;
380    int priority = 0;
381    struct NEXUS_File_P_Item *e= BLST_S_FIRST(&sched->queued);
382    struct NEXUS_File_P_Item *active = e;
383    struct NEXUS_File_P_Item *prev_active = NULL; /* keep prev around for easier delete */
384    struct NEXUS_File_P_Item *prev = NULL;
385    ssize_t size = 0;
386    struct NEXUS_File_P_Queue *queue;
387    bool locked;
388
389    BSTD_UNUSED(worker);
390    if(!e) {
391        NEXUS_UnlockModule(); /* drop lock */
392        BDBG_MSG(("%u: queue is empty", worker->no));
393        /* coverity[check_return] */
394        BKNI_WaitForEvent(sched->signal, 100); /* wait for 100 ms */
395        NEXUS_LockModule(); /* restore lock */
396        return 0;
397    }
398    BDBG_MSG_TRACE(("%u: ready %#lx", worker->no, (unsigned long)e));
399    do {
400        switch(e->ioType) {
401        case ioType_Write:
402            BDBG_ASSERT(e->io.write.fd);
403            priority = e->io.write.fd->priority.get(e->io.write.fd->priority.cntx);
404            BDBG_MSG_TRACE(("file[wr] %#x priority %d", e->io.write.fd, priority));
405            break;
406        case ioType_Read:
407            BDBG_ASSERT(e->io.read.fd);
408            priority = e->io.read.fd->priority.get(e->io.read.fd->priority.cntx);
409            BDBG_MSG_TRACE(("file[rd] %#x priority %d", e->io.write.fd, priority));
410            break;
411        case ioType_MuxWrite:
412            BDBG_ASSERT(e->io.read.fd);
413            priority = e->io.muxWrite.fd->priority.get(e->io.muxWrite.fd->priority.cntx);
414            BDBG_MSG_TRACE(("file[muxWr] %#x priority %d", e->io.muxWrite.fd, priority));
415            break;
416        case ioType_MuxRead:
417            BDBG_ASSERT(e->io.read.fd);
418            priority = e->io.muxRead.fd->priority.get(e->io.muxRead.fd->priority.cntx);
419            BDBG_MSG_TRACE(("file[muxRd] %#x priority %d", e->io.muxRead.fd, priority));
420            break;
421        default:
422            BDBG_ASSERT(0);
423            break;
424        }
425        if(best_priority<=priority) { /* since we have LIFO, compensate for it, by giving more priority to last items */
426            active = e;
427            prev_active = prev;
428            best_priority = priority;
429        }
430        prev = e;
431    } while(NULL!=(e = BLST_S_NEXT(e, list)));
432    BDBG_ASSERT(active);
433    /* remove from the queued list */
434    e=active;
435    worker->current = e;
436    BDBG_MSG_TRACE(("%u: dequeue %#lx", worker->no, (unsigned long)e));
437    if(prev_active) {
438        BLST_S_REMOVE_NEXT(&sched->queued, prev_active, list);
439    } else {
440        BLST_S_REMOVE_HEAD(&sched->queued, list);
441    }
442    NEXUS_UnlockModule(); /* drop lock */
443    switch(e->ioType) {
444    case ioType_Write:
445        {
446        size_t write_size = e->length;
447        BDBG_ASSERT(e->io.write.fd);
448        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:wr]:+ %#lx %#lx %u %#lx %#lx", worker->no, (unsigned long)e->io.write.fd, (unsigned long)e->buf, e->length, (unsigned long)e->callback, (unsigned long)e->cntx));
449#if 0
450        if(e->length/2>4096) {
451            write_size = e->length/2 ;
452            write_size -= write_size%4096;
453        }
454#endif
455        size = e->io.write.fd->write(e->io.write.fd, e->buf, write_size);
456        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:wr]:- %#lx %#lx %u->%d %#lx %#lx", worker->no, (unsigned long)e->io.write.fd, (unsigned long)e->buf, e->length, size, (unsigned long)e->callback, (unsigned long)e->cntx));
457        break;
458        }
459    case ioType_Read:
460        BDBG_ASSERT(e->io.read.fd);
461        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:rd]:+ %#lx %#lx %u %#lx %#lx", worker->no, (unsigned long)e->io.read.fd, (unsigned long)e->buf, e->length, (unsigned long)e->callback, (unsigned long)e->cntx));
462        size = e->io.read.fd->read(e->io.read.fd, e->buf, e->length);
463
464#if 0
465/* PR 47854 - Test code to simulate an I/O error. Playback will cancel the thread and File should recover.
466To give a realistic test, we must use random time. The max hang time must be sometimes greater than the
467NEXUS_Playback_P_CheckWaitingIo timeout. */
468{
469    static int count = 0;
470    static int next_hang = 100;
471    if (++count % next_hang == 0) {
472        unsigned hang_time = rand() % 8000;
473        BDBG_WRN(("begin induced %d hang in worker %p", hang_time, worker));
474        BKNI_Sleep(hang_time);
475        BDBG_WRN(("end induced hang in worker %p", worker));
476        next_hang = (rand() % 200) + 1;
477    }
478}
479#endif
480        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:rd]:- %#lx %#lx %u->%d %#lx %#lx", worker->no, (unsigned long)e->io.read.fd, (unsigned long)e->buf, e->length, size, (unsigned long)e->callback, (unsigned long)e->cntx));
481        break;
482    case ioType_MuxWrite:
483        BDBG_ASSERT(e->io.muxWrite.fd);
484        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:muxWr]:+ %#lx %#lx %lu:%u %#lx %#lx", worker->no, (unsigned long)e->io.muxWrite.fd, (unsigned long)e->buf, (unsigned long)e->io.muxWrite.offset, e->length, (unsigned long)e->callback, (unsigned long)e->cntx));
485        size = e->io.muxWrite.fd->write(e->io.muxWrite.fd, e->io.muxWrite.offset, e->buf, e->length);
486        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:muxWr]:- %#lx %#lx %lu:%u->%d %#lx %#lx", worker->no, (unsigned long)e->io.muxWrite.fd, (unsigned long)e->buf, (unsigned long)e->io.muxWrite.offset, e->length, size, (unsigned long)e->callback, (unsigned long)e->cntx));
487        break;
488    case ioType_MuxRead:
489        BDBG_ASSERT(e->io.muxRead.fd);
490        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:muxRd]:+ %#lx %#lx %lu:%u %#lx %#lx", worker->no, (unsigned long)e->io.muxRead.fd, (unsigned long)e->buf, (unsigned long)e->io.muxRead.offset, e->length, (unsigned long)e->callback, (unsigned long)e->cntx));
491        size = e->io.muxWrite.fd->read(e->io.muxRead.fd, e->io.muxRead.offset, e->buf, e->length);
492        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:muxRd]:- %#lx %#lx %lu:%u->%d %#lx %#lx", worker->no, (unsigned long)e->io.muxRead.fd, (unsigned long)e->buf, (unsigned long)e->io.muxRead.offset, e->length, size, (unsigned long)e->callback, (unsigned long)e->cntx));
493        break;
494    default:
495        BDBG_ASSERT(0);
496        break;
497    }
498
499    /* Do not dereference e until verifying worker->current. It may not be owned by this thread any more. */
500    if (!worker->current) {
501        NEXUS_LockModule();
502        /* this worker was cancelled */
503        BDBG_WRN(("Cancelled io completed. Worker %p is now a zombie.", worker));
504        NEXUS_File_P_AddZombie(worker);
505        return -1; /* this will cause the thread's loop to terminate */
506    }
507
508    e->result = size;
509    locked = NEXUS_Module_TryLock(e->module);
510    BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque: %#lx:%#lx %slocked", (unsigned long)sched, (unsigned long)e, locked?"":"not"));
511
512    queue = &sched->completed;
513    if(locked) {
514        worker->current = NULL;
515        queue = &sched->free;
516        /* call callback and add into the free list */
517        e->callback(e->cntx, size);
518        NEXUS_Module_Unlock(e->module);
519    }
520    NEXUS_LockModule();
521    worker->current = NULL;
522    BLST_S_INSERT_HEAD(queue, e, list);
523    if(queue==&sched->completed && sched->timer==NULL) {
524        sched->timer = NEXUS_ScheduleTimer(10, NEXUS_P_File_CallCompleted, sched);
525    }
526
527    return 0;
528}
529
530static void
531NEXUS_P_File_Scheduller_Thread(void *w)
532{
533    struct NEXUS_File_P_IoWorker *worker=w;
534    struct NEXUS_File_P_Scheduller *sched=pScheduler;
535
536    BDBG_MSG(("NEXUS_P_File_Scheduller_Thread: %u started", (unsigned long)worker));
537    NEXUS_LockModule();
538    for(;;) {
539        if(sched->kill_count>0) {
540            sched->kill_count--;
541            break;
542        }
543        NEXUS_P_File_TryCompleted(sched);
544        if (NEXUS_P_File_FindAndDeque(sched, worker)) {
545            goto zombie;
546        }
547    }
548    NEXUS_UnlockModule();
549    BDBG_MSG(("NEXUS_P_File_Scheduller_Thread: %u stopped", (unsigned long)worker));
550    return;
551
552zombie:
553    NEXUS_UnlockModule();
554    BDBG_MSG(("NEXUS_P_File_Scheduller_Thread zombie %p stopped", (unsigned long)worker));
555    return;
556}
557
558NEXUS_Error
559NEXUS_File_P_Scheduller_Start(const NEXUS_FileModuleSettings *settings)
560{
561    unsigned i;
562    BERR_Code rc;
563
564    BDBG_ASSERT(settings);
565    if(settings->workerThreads >= NEXUS_FILE_MAX_IOWORKERS || settings->workerThreads<1) { return BERR_TRACE(NEXUS_NOT_SUPPORTED); }
566
567    pScheduler->workerThreads = settings->workerThreads;
568    pScheduler->settings = *settings;
569    pScheduler->kill_count = 0;
570    pScheduler->timer = NULL;
571    BLST_S_INIT(&pScheduler->free);
572    BLST_S_INIT(&pScheduler->queued);
573    BLST_S_INIT(&pScheduler->completed);
574    BKNI_Memset(pScheduler->zombies, 0, sizeof(pScheduler->zombies));
575    pScheduler->items = BKNI_Malloc(sizeof(*pScheduler->items)*settings->maxQueuedElements);
576    if(!pScheduler->items) { rc = BERR_TRACE(NEXUS_OUT_OF_SYSTEM_MEMORY); goto err_alloc;}
577    for(i=settings->maxQueuedElements;i>0;i--) { /* insert elements in the reverse order */
578        BLST_S_INSERT_HEAD(&pScheduler->free, &pScheduler->items[i-1], list);
579    }
580    rc = BKNI_CreateEvent(&pScheduler->signal);
581    if(rc!=BERR_SUCCESS) { rc = BERR_TRACE(rc); goto err_event;}
582    for(i=0;i<pScheduler->workerThreads;i++) {
583        pScheduler->workers[i] = BKNI_Malloc(sizeof(struct NEXUS_File_P_IoWorker));
584        pScheduler->workers[i]->no = i;
585        pScheduler->workers[i]->thread = NULL;
586        pScheduler->workers[i]->current = NULL;
587    }
588    for(i=0;i<pScheduler->workerThreads;i++) {
589        char name[16];
590        BKNI_Snprintf(name, sizeof(name), "nx_io_worker%u", i);
591        pScheduler->workers[i]->thread = NEXUS_Thread_Create(name, NEXUS_P_File_Scheduller_Thread, pScheduler->workers[i], &settings->schedullerSettings);
592        if(!pScheduler->workers[i]->thread) {
593            rc=BERR_TRACE(BERR_OUT_OF_SYSTEM_MEMORY);
594            /* there is no way to stop single thread, so do generic stop */
595            NEXUS_File_P_Scheduller_Stop();
596            goto error;
597        }
598    }
599    return BERR_SUCCESS;
600
601err_event:
602    BKNI_Free(pScheduler->items);
603err_alloc:
604error:
605    return rc;
606}
607
608void
609NEXUS_File_P_Scheduller_Stop(void)
610{
611    unsigned i;
612
613    NEXUS_LockModule();
614    pScheduler->kill_count = 0;
615    /* we need to account for the case when not all threads were initialized */
616    for(i=0;i<pScheduler->workerThreads;i++) {
617        if(!pScheduler->workers[i]->thread) {
618            break;
619        }
620        pScheduler->kill_count++;
621    }
622    for(;;) {
623        unsigned count = pScheduler->kill_count;
624        NEXUS_UnlockModule();
625        if(count==0) {
626            break;
627        }
628        for(i=0;i<count;i++) {
629            BDBG_MSG(("NEXUS_File_P_Scheduller_Stop: waking thread %u out of %u", i, count));
630            BKNI_SetEvent(pScheduler->signal);
631            BKNI_Sleep(10); /* sleep 10 ms to let worker wake-up */
632        }
633        NEXUS_LockModule();
634    }
635    for(i=0;i<pScheduler->workerThreads;i++) {
636        if(!pScheduler->workers[i]->thread) {
637            break;
638        }
639        NEXUS_Thread_Destroy(pScheduler->workers[i]->thread);
640        BKNI_Free(pScheduler->workers[i]);
641    }
642    NEXUS_File_P_CleanupZombies();
643    if(pScheduler->timer) {
644        NEXUS_CancelTimer(pScheduler->timer);
645    }
646    BKNI_DestroyEvent(pScheduler->signal);
647    BKNI_Free(pScheduler->items);
648    return;
649}
650
651void
652NEXUS_File_AsyncMuxWrite(NEXUS_MuxFileIoHandle fd, off_t offset, const void *buf, size_t length, NEXUS_ModuleHandle module, NEXUS_File_Callback callback, void *cntx)
653{
654    struct NEXUS_File_P_Item *e;
655    BDBG_ASSERT(fd);
656    BDBG_ASSERT(buf);
657    BDBG_ASSERT(module);
658    BDBG_ASSERT(callback);
659
660    NEXUS_File_P_CleanupZombies();
661
662    e = BLST_S_FIRST(&pScheduler->free);
663    if(e) {
664        BLST_S_REMOVE_HEAD(&pScheduler->free, list);
665        e->ioType = ioType_MuxWrite;
666        e->io.muxWrite.fd = fd;
667        e->io.muxWrite.offset = offset;
668        e->buf = (void *)buf;
669        e->length = length;
670        e->cntx = cntx;
671        e->module = module;
672        e->callback = callback;
673        e->result = -1;
674        BLST_S_INSERT_HEAD(&pScheduler->queued, e, list);
675        BDBG_MSG_TRACE(("muxWr: queue %#lx", (unsigned long)e));
676        BKNI_SetEvent(pScheduler->signal);
677        return;
678    } else {
679        BDBG_WRN(("NEXUS_File_AsyncMuxWrite: %#lx %#lx %u not enough descriptors", (unsigned long)cntx, (unsigned long)fd, (unsigned)length));
680        callback(cntx, -1);
681        return;
682    }
683    return;
684}
685
686void
687NEXUS_File_AsyncMuxRead(NEXUS_MuxFileIoHandle fd, off_t offset, void *buf, size_t length, NEXUS_ModuleHandle module, NEXUS_File_Callback callback, void *cntx)
688{
689    struct NEXUS_File_P_Item *e;
690    BDBG_ASSERT(fd);
691    BDBG_ASSERT(buf);
692    BDBG_ASSERT(module);
693    BDBG_ASSERT(callback);
694
695    NEXUS_File_P_CleanupZombies();
696
697    e = BLST_S_FIRST(&pScheduler->free);
698    if(e) {
699        BLST_S_REMOVE_HEAD(&pScheduler->free, list);
700        e->ioType = ioType_MuxRead;
701        e->io.muxRead.fd = fd;
702        e->io.muxRead.offset = offset;
703        e->buf = (void *)buf;
704        e->length = length;
705        e->cntx = cntx;
706        e->module = module;
707        e->callback = callback;
708        e->result = -1;
709        BLST_S_INSERT_HEAD(&pScheduler->queued, e, list);
710        BDBG_MSG_TRACE(("muxRd: queue %#lx", (unsigned long)e));
711        BKNI_SetEvent(pScheduler->signal);
712        return;
713    } else {
714        BDBG_WRN(("NEXUS_File_AsyncMuxRead: %#lx %#lx %u not enough descriptors", (unsigned long)cntx, (unsigned long)fd, (unsigned)length));
715        callback(cntx, -1);
716        return;
717    }
718    return;
719}
720
Note: See TracBrowser for help on using the repository browser.