source: svn/trunk/newcon3bcm2_21bu/nexus/modules/file/src/nexus_file_scheduler.c

Last change on this file was 2, checked in by jglee, 11 years ago

first commit

  • Property svn:executable set to *
File size: 26.4 KB
Line 
1/***************************************************************************
2 *     (c)2007-2011 Broadcom Corporation
3 *
4 *  This program is the proprietary software of Broadcom Corporation and/or its licensors,
5 *  and may only be used, duplicated, modified or distributed pursuant to the terms and
6 *  conditions of a separate, written license agreement executed between you and Broadcom
7 *  (an "Authorized License").  Except as set forth in an Authorized License, Broadcom grants
8 *  no license (express or implied), right to use, or waiver of any kind with respect to the
9 *  Software, and Broadcom expressly reserves all rights in and to the Software and all
10 *  intellectual property rights therein.  IF YOU HAVE NO AUTHORIZED LICENSE, THEN YOU
11 *  HAVE NO RIGHT TO USE THIS SOFTWARE IN ANY WAY, AND SHOULD IMMEDIATELY
12 *  NOTIFY BROADCOM AND DISCONTINUE ALL USE OF THE SOFTWARE.
13 *
14 *  Except as expressly set forth in the Authorized License,
15 *
16 *  1.     This program, including its structure, sequence and organization, constitutes the valuable trade
17 *  secrets of Broadcom, and you shall use all reasonable efforts to protect the confidentiality thereof,
18 *  and to use this information only in connection with your use of Broadcom integrated circuit products.
19 *
20 *  2.     TO THE MAXIMUM EXTENT PERMITTED BY LAW, THE SOFTWARE IS PROVIDED "AS IS"
21 *  AND WITH ALL FAULTS AND BROADCOM MAKES NO PROMISES, REPRESENTATIONS OR
22 *  WARRANTIES, EITHER EXPRESS, IMPLIED, STATUTORY, OR OTHERWISE, WITH RESPECT TO
23 *  THE SOFTWARE.  BROADCOM SPECIFICALLY DISCLAIMS ANY AND ALL IMPLIED WARRANTIES
24 *  OF TITLE, MERCHANTABILITY, NONINFRINGEMENT, FITNESS FOR A PARTICULAR PURPOSE,
25 *  LACK OF VIRUSES, ACCURACY OR COMPLETENESS, QUIET ENJOYMENT, QUIET POSSESSION
26 *  OR CORRESPONDENCE TO DESCRIPTION. YOU ASSUME THE ENTIRE RISK ARISING OUT OF
27 *  USE OR PERFORMANCE OF THE SOFTWARE.
28 *
29 *  3.     TO THE MAXIMUM EXTENT PERMITTED BY LAW, IN NO EVENT SHALL BROADCOM OR ITS
30 *  LICENSORS BE LIABLE FOR (i) CONSEQUENTIAL, INCIDENTAL, SPECIAL, INDIRECT, OR
31 *  EXEMPLARY DAMAGES WHATSOEVER ARISING OUT OF OR IN ANY WAY RELATING TO YOUR
32 *  USE OF OR INABILITY TO USE THE SOFTWARE EVEN IF BROADCOM HAS BEEN ADVISED OF
33 *  THE POSSIBILITY OF SUCH DAMAGES; OR (ii) ANY AMOUNT IN EXCESS OF THE AMOUNT
34 *  ACTUALLY PAID FOR THE SOFTWARE ITSELF OR U.S. $1, WHICHEVER IS GREATER. THESE
35 *  LIMITATIONS SHALL APPLY NOTWITHSTANDING ANY FAILURE OF ESSENTIAL PURPOSE OF
36 *  ANY LIMITED REMEDY.
37 *
38 * $brcm_Workfile: nexus_file_scheduler.c $
39 * $brcm_Revision: 18 $
40 * $brcm_Date: 11/30/11 1:37p $
41 *
42 * Module Description:
43 *
44 * Revision History:
45 *
46 * $brcm_Log: /nexus/modules/file/src/nexus_file_scheduler.c $
47 *
48 * 18   11/30/11 1:37p vsilyaev
49 * SW7425-1790: Added support for S/G file mux I/O
50 *
51 * SW7425-324/1   11/21/11 1:01p vsilyaev
52 * SW7425-1790: Added support for S/G file mux I/O
53 *
54 * 17   9/6/11 10:10a erickson
55 * SWNOOS-482: correct internal spelling
56 *
57 * 16   9/6/11 10:05a erickson
58 * SWNOOS-482: add NEXUS_FileModuleSettings.schedulerSettings[] per-thread
59 *  array
60 *
61 * 15   5/13/11 4:13p erickson
62 * SWDTV-6386: add "nx_" namespace prefix for internal nexus threads
63 *
64 * 14   4/20/11 7:49p vsilyaev
65 * SW7425-394: Allow application to set number of I/O worker threads that
66 *  are serving I/O requests
67 *
68 * 13   4/18/11 1:47p erickson
69 * SW7420-1123: fix 2.6.18 warning
70 *
71 * 12   4/6/11 6:53p vsilyaev
72 * SW7425-232: Removed MuxFileIo API to avoid naming clashes
73 *
74 * 11   4/6/11 4:53p vsilyaev
75 * SW7425-232: Added MuxFile interface
76 *
77 * 10   5/11/10 2:16p jtna
78 * SW7125-311: Coverity Defect ID:20334 CHECKED_RETURN
79 *
80 * 9   4/2/10 2:37p erickson
81 * SW7405-3833: nexus_file_pvr.h is now part of the public API
82 *
83 * 8   10/9/09 11:28a vsilyaev
84 * SW7405-3126: Continue write operatrion until entire block is written
85 *
86 * 7   1/20/09 10:52a erickson
87 * PR51175: fix NEXUS_FilePlay_Cancel. NEXUS_File_P_Item was getting
88 *  processed twice.
89 *
90 * 6   12/30/08 6:50p vsilyaev
91 * PR 50582: Fixed condition to spawn a timer
92 *
93 * 5   12/29/08 4:53p erickson
94 * PR50738: fix malloc size
95 *
96 * 4   12/11/08 2:05p erickson
97 * PR47854: clarify warning in NEXUS_FilePlay_Cancel
98 *
99 * 3   11/18/08 4:55p erickson
100 * PR47854: added NEXUS_FilePlay_Cancel to recover from hung threads due
101 *  to bad I/O
102 *
103 * 2   4/2/08 11:31a erickson
104 * PR40198: fix DEBUG=n warning
105 *
106 * 1   1/18/08 2:16p jgarrett
107 * PR 38808: Merging to main branch
108 *
109 * Nexus_Devel/4   11/7/07 5:34p vsilyaev
110 * PR 36788: Updated file API
111 *
112 * Nexus_Devel/3   10/15/07 5:10p vsilyaev
113 * PR 35824: Fixed queue operations
114 *
115 * Nexus_Devel/2   10/12/07 5:11p vsilyaev
116 * PR 35824: Added O_DIRECT flag, fixed lock/unlock order
117 *
118 * Nexus_Devel/1   10/10/07 3:57p vsilyaev
119 * PR 35824: File I/O module
120 *
121 * $copied_brcm_Log: /BSEAV/api/src/pvr/bsettop_fileio.c $
122 * $copied_brcm_Revision: 38 $
123 * $copied_brcm_Date: 9/24/07 3:52p $
124 **************************************************************************/
125#include "nexus_file_module.h"
126#include "bfile_io.h"
127#include "nexus_file_pvr.h"
128#include "nexus_file_muxio.h"
129
130BDBG_MODULE(nexus_file_scheduler);
131#define BDBG_MSG_TRACE(x) BDBG_MSG(x)
132
133struct NEXUS_File_P_Item {
134    BLST_S_ENTRY(NEXUS_File_P_Item) list;
135    enum {
136      ioType_Read,
137      ioType_Write,
138      ioType_MuxRead,
139      ioType_MuxWrite
140    } ioType;
141    union {
142        struct {
143            bfile_io_read_t fd;
144        } read;
145        struct {
146            bfile_io_write_t fd;
147        } write;
148        struct {
149            bfile_io_mux_t fd;
150            off_t offset;
151        } muxRead;
152        struct {
153            bfile_io_mux_t fd;
154            off_t offset;
155        } muxWrite;
156    } io;
157    void *buf; /* copy of buf */
158    size_t length;
159    void *cntx;
160    NEXUS_File_Callback callback;
161    NEXUS_ModuleHandle module;
162    ssize_t result;
163};
164
165BLST_S_HEAD(NEXUS_File_P_Queue, NEXUS_File_P_Item);
166
167struct NEXUS_File_P_IoWorker {
168    NEXUS_ThreadHandle thread;
169    unsigned no;
170    struct NEXUS_File_P_Item *current;
171};
172
173struct NEXUS_File_P_Scheduler {
174    NEXUS_FileModuleSettings settings;
175    BKNI_EventHandle signal; /* signal to wakeup the thread */
176    unsigned kill_count; /* number of task to be killed */
177    unsigned workerThreads;
178    struct NEXUS_File_P_Queue free, queued, completed;
179    struct NEXUS_File_P_Item *items;
180    NEXUS_TimerHandle timer;
181    struct NEXUS_File_P_IoWorker *workers[NEXUS_FILE_MAX_IOWORKERS];/* worker threads to run */
182#define MAX_ZOMBIES 10
183    struct NEXUS_File_P_IoWorker *zombies[MAX_ZOMBIES];
184};
185
186static struct NEXUS_File_P_Scheduler g_NEXUS_File_P_Scheduler;
187
188#define pScheduler (&g_NEXUS_File_P_Scheduler)
189
190static void NEXUS_P_File_Scheduler_Thread(void *w);
191
192#if 0
193static void print_queues(void)
194{
195    struct NEXUS_File_P_Item *e;
196    unsigned count;
197
198    count = 0;
199    for (e= BLST_S_FIRST(&pScheduler->free);e; e =BLST_S_NEXT(e, list)) {
200        count++;
201    }
202    BKNI_Printf("free %d, ", count);
203
204    count = 0;
205    for (e= BLST_S_FIRST(&pScheduler->queued);e; e =BLST_S_NEXT(e, list)) {
206        count++;
207    }
208    BKNI_Printf("queue %d, ", count);
209
210    count = 0;
211    for (e= BLST_S_FIRST(&pScheduler->completed);e; e =BLST_S_NEXT(e, list)) {
212        count++;
213    }
214    BKNI_Printf("completed %d\n", count);
215}
216#endif
217
218void
219NEXUS_FilePlay_Cancel(NEXUS_FilePlayHandle file)
220{
221    unsigned i;
222    for (i=0;i<pScheduler->workerThreads;i++) {
223        struct NEXUS_File_P_IoWorker *worker = pScheduler->workers[i];
224        if (worker->current && worker->current->ioType == ioType_Read && worker->current->io.read.fd == file->file.data) {
225            char name[16];
226
227            BDBG_WRN(("Cancelling File worker %p", pScheduler->workers[i]));
228            /* this item's callback must be called with a failure code in order for other sw state to unwind. */
229
230            /* the new worker gets the item, but the item has failed and should be added to the completed queue now. */
231            pScheduler->workers[i] = BKNI_Malloc(sizeof(struct NEXUS_File_P_IoWorker));
232            pScheduler->workers[i]->no = worker->no;
233            pScheduler->workers[i]->current = NULL; /* don't give the item to the new worker. it will be serviced from NEXUS_P_File_TryCompleted. */
234            worker->current->result = -1;
235            BLST_S_INSERT_HEAD(&pScheduler->completed, worker->current, list);
236
237            /* the cancelled worker no longer owns the item */
238            worker->current = NULL;
239
240            BKNI_Snprintf(name, sizeof(name), "nx_io_worker%u", i);
241            pScheduler->workers[i]->thread = NEXUS_Thread_Create(name, NEXUS_P_File_Scheduler_Thread, pScheduler->workers[i], &pScheduler->settings.schedulerSettings[i]);
242            if(!pScheduler->workers[i]->thread) {
243                BERR_TRACE(BERR_OUT_OF_SYSTEM_MEMORY);
244                /* no recovery */
245            }
246            /* only one can be active at a time */
247            return;
248        }
249    }
250    BDBG_WRN(("NEXUS_FilePlay_Cancel unable to find a pending NEXUS_FilePlayHandle %p. Are you sure I/O is hung on this file?", file));
251}
252
253static void NEXUS_File_P_CleanupZombies(void)
254{
255    int i;
256    for (i=0;i<MAX_ZOMBIES;i++) {
257        if (pScheduler->zombies[i]) {
258            BDBG_WRN(("Cleaned up zombie %p [%d]", pScheduler->zombies[i], i));
259            NEXUS_Thread_Destroy(pScheduler->zombies[i]->thread);
260            BKNI_Free(pScheduler->zombies[i]);
261            pScheduler->zombies[i] = NULL;
262        }
263    }
264}
265
266static void NEXUS_File_P_AddZombie(struct NEXUS_File_P_IoWorker *worker)
267{
268    int i;
269    for (i=0;i<MAX_ZOMBIES;i++) {
270        if (!pScheduler->zombies[i]) {
271            pScheduler->zombies[i] = worker;
272            return;
273        }
274    }
275    BDBG_ERR(("Unable to keep zombie. NEXUS_ThreadHandle leak."));
276}
277
278void
279NEXUS_File_AsyncRead(NEXUS_FileReadHandle fd, void *buf, size_t length, NEXUS_ModuleHandle module, NEXUS_File_Callback callback, void *cntx)
280{
281    struct NEXUS_File_P_Item *e;
282    BDBG_ASSERT(fd);
283    BDBG_ASSERT(buf);
284    BDBG_ASSERT(module);
285    BDBG_ASSERT(callback);
286
287    NEXUS_File_P_CleanupZombies();
288
289    e = BLST_S_FIRST(&pScheduler->free);
290    if(e) {
291        BLST_S_REMOVE_HEAD(&pScheduler->free, list);
292        e->ioType = ioType_Read;
293        e->io.read.fd = fd;
294        e->buf = buf;
295        e->length = length;
296        e->cntx = cntx;
297        e->module = module;
298        e->callback = callback;
299        e->result = -1;
300        BLST_S_INSERT_HEAD(&pScheduler->queued, e, list);
301        BDBG_MSG_TRACE(("rd: queue %#lx", (unsigned long)e));
302        BKNI_SetEvent(pScheduler->signal);
303        return;
304    } else {
305        BDBG_WRN(("NEXUS_File_AsyncRead: %#lx %#lx %u not enough descriptors", (unsigned long)cntx, (unsigned long)fd, (unsigned)length));
306        callback(cntx, -1);
307        return;
308    }
309}
310
311void
312NEXUS_File_AsyncWrite(NEXUS_FileWriteHandle fd, const void *buf, size_t length, NEXUS_ModuleHandle module, NEXUS_File_Callback callback, void *cntx)
313{
314    struct NEXUS_File_P_Item *e;
315    BDBG_ASSERT(fd);
316    BDBG_ASSERT(buf);
317    BDBG_ASSERT(module);
318    BDBG_ASSERT(callback);
319
320    NEXUS_File_P_CleanupZombies();
321
322    e = BLST_S_FIRST(&pScheduler->free);
323    if(e) {
324        BLST_S_REMOVE_HEAD(&pScheduler->free, list);
325        e->ioType = ioType_Write;
326        e->io.write.fd = fd;
327        e->buf = (void *)buf;
328        e->length = length;
329        e->cntx = cntx;
330        e->module = module;
331        e->callback = callback;
332        e->result = -1;
333        BLST_S_INSERT_HEAD(&pScheduler->queued, e, list);
334        BDBG_MSG_TRACE(("wr: queue %#lx", (unsigned long)e));
335        BKNI_SetEvent(pScheduler->signal);
336        return;
337    } else {
338        BDBG_WRN(("NEXUS_File_AsyncWrite: %#lx %#lx %u not enough descriptors", (unsigned long)cntx, (unsigned long)fd, (unsigned)length));
339        callback(cntx, -1);
340        return;
341    }
342    return;
343}
344
345/* calls all completed callbacks */
346static void
347NEXUS_P_File_TryCompleted(struct NEXUS_File_P_Scheduler *sched)
348{
349    if(BLST_S_FIRST(&pScheduler->completed)) {
350        struct NEXUS_File_P_Item *e;
351        struct NEXUS_File_P_Queue completed = sched->completed; /* save copy of completed queue */
352        BLST_S_INIT(&sched->completed); /* clear completed queue */
353        while( NULL!=(e=BLST_S_FIRST(&completed))) {
354            struct NEXUS_File_P_Queue *queue;
355            bool locked;
356
357            BLST_S_REMOVE_HEAD(&completed, list);
358            NEXUS_UnlockModule();
359            locked = NEXUS_Module_TryLock(e->module);
360            queue = &sched->completed;
361            if(locked) {
362                queue = &sched->free;
363                /* call callback and add into the free list */
364                e->callback(e->cntx, e->result);
365                NEXUS_Module_Unlock(e->module);
366            }
367            NEXUS_LockModule();
368            BLST_S_INSERT_HEAD(queue, e, list);
369        }
370    }
371    return;
372}
373
374static void
375NEXUS_P_File_CallCompleted(void *s)
376{
377    struct NEXUS_File_P_Scheduler *sched = s;
378    BDBG_MSG_TRACE(("NEXUS_P_File_CallCompleted: %#lx", (unsigned long)s));
379    sched->timer = NULL;
380    NEXUS_P_File_TryCompleted(sched);
381    if(BLST_S_FIRST(&pScheduler->completed)) {
382        sched->timer = NEXUS_ScheduleTimer(10, NEXUS_P_File_CallCompleted, sched);
383    }
384    return;
385}
386
387
388static NEXUS_Error
389NEXUS_P_File_FindAndDeque(struct NEXUS_File_P_Scheduler *sched, struct NEXUS_File_P_IoWorker *worker)
390{
391    int best_priority = 0;
392    int priority = 0;
393    struct NEXUS_File_P_Item *e= BLST_S_FIRST(&sched->queued);
394    struct NEXUS_File_P_Item *active = e;
395    struct NEXUS_File_P_Item *prev_active = NULL; /* keep prev around for easier delete */
396    struct NEXUS_File_P_Item *prev = NULL;
397    ssize_t size = 0;
398    struct NEXUS_File_P_Queue *queue;
399    bool locked;
400
401    BSTD_UNUSED(worker);
402    if(!e) {
403        NEXUS_UnlockModule(); /* drop lock */
404        BDBG_MSG(("%u: queue is empty", worker->no));
405        /* coverity[check_return] */
406        BKNI_WaitForEvent(sched->signal, 100); /* wait for 100 ms */
407        NEXUS_LockModule(); /* restore lock */
408        return 0;
409    }
410    BDBG_MSG_TRACE(("%u: ready %#lx", worker->no, (unsigned long)e));
411    do {
412        switch(e->ioType) {
413        case ioType_Write:
414            BDBG_ASSERT(e->io.write.fd);
415            priority = e->io.write.fd->priority.get(e->io.write.fd->priority.cntx);
416            BDBG_MSG_TRACE(("file[wr] %#x priority %d", e->io.write.fd, priority));
417            break;
418        case ioType_Read:
419            BDBG_ASSERT(e->io.read.fd);
420            priority = e->io.read.fd->priority.get(e->io.read.fd->priority.cntx);
421            BDBG_MSG_TRACE(("file[rd] %#x priority %d", e->io.write.fd, priority));
422            break;
423        case ioType_MuxWrite:
424            BDBG_ASSERT(e->io.read.fd);
425            priority = e->io.muxWrite.fd->priority.get(e->io.muxWrite.fd->priority.cntx);
426            BDBG_MSG_TRACE(("file[muxWr] %#x priority %d", e->io.muxWrite.fd, priority));
427            break;
428        case ioType_MuxRead:
429            BDBG_ASSERT(e->io.read.fd);
430            priority = e->io.muxRead.fd->priority.get(e->io.muxRead.fd->priority.cntx);
431            BDBG_MSG_TRACE(("file[muxRd] %#x priority %d", e->io.muxRead.fd, priority));
432            break;
433        default:
434            BDBG_ASSERT(0);
435            break;
436        }
437        if(best_priority<=priority) { /* since we have LIFO, compensate for it, by giving more priority to last items */
438            active = e;
439            prev_active = prev;
440            best_priority = priority;
441        }
442        prev = e;
443    } while(NULL!=(e = BLST_S_NEXT(e, list)));
444    BDBG_ASSERT(active);
445    /* remove from the queued list */
446    e=active;
447    worker->current = e;
448    BDBG_MSG_TRACE(("%u: dequeue %#lx", worker->no, (unsigned long)e));
449    if(prev_active) {
450        BLST_S_REMOVE_NEXT(&sched->queued, prev_active, list);
451    } else {
452        BLST_S_REMOVE_HEAD(&sched->queued, list);
453    }
454    NEXUS_UnlockModule(); /* drop lock */
455    switch(e->ioType) {
456    case ioType_Write:
457        {
458        size_t write_size = e->length;
459        BDBG_ASSERT(e->io.write.fd);
460        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:wr]:+ %#lx %#lx %u %#lx %#lx", worker->no, (unsigned long)e->io.write.fd, (unsigned long)e->buf, e->length, (unsigned long)e->callback, (unsigned long)e->cntx));
461#if 0
462        if(e->length/2>4096) {
463            write_size = e->length/2 ;
464            write_size -= write_size%4096;
465        }
466#endif
467        size = e->io.write.fd->write(e->io.write.fd, e->buf, write_size);
468        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:wr]:- %#lx %#lx %u->%d %#lx %#lx", worker->no, (unsigned long)e->io.write.fd, (unsigned long)e->buf, e->length, size, (unsigned long)e->callback, (unsigned long)e->cntx));
469        break;
470        }
471    case ioType_Read:
472        BDBG_ASSERT(e->io.read.fd);
473        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:rd]:+ %#lx %#lx %u %#lx %#lx", worker->no, (unsigned long)e->io.read.fd, (unsigned long)e->buf, e->length, (unsigned long)e->callback, (unsigned long)e->cntx));
474        size = e->io.read.fd->read(e->io.read.fd, e->buf, e->length);
475
476#if 0
477/* PR 47854 - Test code to simulate an I/O error. Playback will cancel the thread and File should recover.
478To give a realistic test, we must use random time. The max hang time must be sometimes greater than the
479NEXUS_Playback_P_CheckWaitingIo timeout. */
480{
481    static int count = 0;
482    static int next_hang = 100;
483    if (++count % next_hang == 0) {
484        unsigned hang_time = rand() % 8000;
485        BDBG_WRN(("begin induced %d hang in worker %p", hang_time, worker));
486        BKNI_Sleep(hang_time);
487        BDBG_WRN(("end induced hang in worker %p", worker));
488        next_hang = (rand() % 200) + 1;
489    }
490}
491#endif
492        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:rd]:- %#lx %#lx %u->%d %#lx %#lx", worker->no, (unsigned long)e->io.read.fd, (unsigned long)e->buf, e->length, size, (unsigned long)e->callback, (unsigned long)e->cntx));
493        break;
494    case ioType_MuxWrite:
495        BDBG_ASSERT(e->io.muxWrite.fd);
496        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:muxWr]:+ %#lx %#lx %lu:%u %#lx %#lx", worker->no, (unsigned long)e->io.muxWrite.fd, (unsigned long)e->buf, (unsigned long)e->io.muxWrite.offset, e->length, (unsigned long)e->callback, (unsigned long)e->cntx));
497        size = e->io.muxWrite.fd->write(e->io.muxWrite.fd, e->io.muxWrite.offset, (const NEXUS_FileMuxIoWriteAtom *)e->buf, e->length);
498        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:muxWr]:- %#lx %#lx %lu:%u->%d %#lx %#lx", worker->no, (unsigned long)e->io.muxWrite.fd, (unsigned long)e->buf, (unsigned long)e->io.muxWrite.offset, e->length, size, (unsigned long)e->callback, (unsigned long)e->cntx));
499        break;
500    case ioType_MuxRead:
501        BDBG_ASSERT(e->io.muxRead.fd);
502        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:muxRd]:+ %#lx %#lx %lu:%u %#lx %#lx", worker->no, (unsigned long)e->io.muxRead.fd, (unsigned long)e->buf, (unsigned long)e->io.muxRead.offset, e->length, (unsigned long)e->callback, (unsigned long)e->cntx));
503        size = e->io.muxWrite.fd->read(e->io.muxRead.fd, e->io.muxRead.offset, (const NEXUS_FileMuxIoReadAtom *)e->buf, e->length);
504        BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque[%u:muxRd]:- %#lx %#lx %lu:%u->%d %#lx %#lx", worker->no, (unsigned long)e->io.muxRead.fd, (unsigned long)e->buf, (unsigned long)e->io.muxRead.offset, e->length, size, (unsigned long)e->callback, (unsigned long)e->cntx));
505        break;
506    default:
507        BDBG_ASSERT(0);
508        break;
509    }
510
511    /* Do not dereference e until verifying worker->current. It may not be owned by this thread any more. */
512    if (!worker->current) {
513        NEXUS_LockModule();
514        /* this worker was cancelled */
515        BDBG_WRN(("Cancelled io completed. Worker %p is now a zombie.", worker));
516        NEXUS_File_P_AddZombie(worker);
517        return -1; /* this will cause the thread's loop to terminate */
518    }
519
520    e->result = size;
521    locked = NEXUS_Module_TryLock(e->module);
522    BDBG_MSG_TRACE(("NEXUS_P_File_FindAndDeque: %#lx:%#lx %slocked", (unsigned long)sched, (unsigned long)e, locked?"":"not"));
523
524    queue = &sched->completed;
525    if(locked) {
526        worker->current = NULL;
527        queue = &sched->free;
528        /* call callback and add into the free list */
529        e->callback(e->cntx, size);
530        NEXUS_Module_Unlock(e->module);
531    }
532    NEXUS_LockModule();
533    worker->current = NULL;
534    BLST_S_INSERT_HEAD(queue, e, list);
535    if(queue==&sched->completed && sched->timer==NULL) {
536        sched->timer = NEXUS_ScheduleTimer(10, NEXUS_P_File_CallCompleted, sched);
537    }
538
539    return 0;
540}
541
542static void
543NEXUS_P_File_Scheduler_Thread(void *w)
544{
545    struct NEXUS_File_P_IoWorker *worker=w;
546    struct NEXUS_File_P_Scheduler *sched=pScheduler;
547
548    BDBG_MSG(("NEXUS_P_File_Scheduler_Thread: %u started", (unsigned long)worker));
549    NEXUS_LockModule();
550    for(;;) {
551        if(sched->kill_count>0) {
552            sched->kill_count--;
553            break;
554        }
555        NEXUS_P_File_TryCompleted(sched);
556        if (NEXUS_P_File_FindAndDeque(sched, worker)) {
557            goto zombie;
558        }
559    }
560    NEXUS_UnlockModule();
561    BDBG_MSG(("NEXUS_P_File_Scheduler_Thread: %u stopped", (unsigned long)worker));
562    return;
563
564zombie:
565    NEXUS_UnlockModule();
566    BDBG_MSG(("NEXUS_P_File_Scheduler_Thread zombie %p stopped", (unsigned long)worker));
567    return;
568}
569
570NEXUS_Error
571NEXUS_File_P_Scheduler_Start(const NEXUS_FileModuleSettings *settings)
572{
573    unsigned i;
574    BERR_Code rc;
575
576    BDBG_ASSERT(settings);
577    if(settings->workerThreads >= NEXUS_FILE_MAX_IOWORKERS || settings->workerThreads<1) { return BERR_TRACE(NEXUS_NOT_SUPPORTED); }
578
579    pScheduler->workerThreads = settings->workerThreads;
580    pScheduler->settings = *settings;
581    pScheduler->kill_count = 0;
582    pScheduler->timer = NULL;
583    BLST_S_INIT(&pScheduler->free);
584    BLST_S_INIT(&pScheduler->queued);
585    BLST_S_INIT(&pScheduler->completed);
586    BKNI_Memset(pScheduler->zombies, 0, sizeof(pScheduler->zombies));
587    pScheduler->items = BKNI_Malloc(sizeof(*pScheduler->items)*settings->maxQueuedElements);
588    if(!pScheduler->items) { rc = BERR_TRACE(NEXUS_OUT_OF_SYSTEM_MEMORY); goto err_alloc;}
589    for(i=settings->maxQueuedElements;i>0;i--) { /* insert elements in the reverse order */
590        BLST_S_INSERT_HEAD(&pScheduler->free, &pScheduler->items[i-1], list);
591    }
592    rc = BKNI_CreateEvent(&pScheduler->signal);
593    if(rc!=BERR_SUCCESS) { rc = BERR_TRACE(rc); goto err_event;}
594    for(i=0;i<pScheduler->workerThreads;i++) {
595        pScheduler->workers[i] = BKNI_Malloc(sizeof(struct NEXUS_File_P_IoWorker));
596        pScheduler->workers[i]->no = i;
597        pScheduler->workers[i]->thread = NULL;
598        pScheduler->workers[i]->current = NULL;
599    }
600    for(i=0;i<pScheduler->workerThreads;i++) {
601        char name[16];
602        BKNI_Snprintf(name, sizeof(name), "nx_io_worker%u", i);
603        pScheduler->workers[i]->thread = NEXUS_Thread_Create(name, NEXUS_P_File_Scheduler_Thread, pScheduler->workers[i], &settings->schedulerSettings[i]);
604        if(!pScheduler->workers[i]->thread) {
605            rc=BERR_TRACE(BERR_OUT_OF_SYSTEM_MEMORY);
606            /* there is no way to stop single thread, so do generic stop */
607            NEXUS_File_P_Scheduler_Stop();
608            goto error;
609        }
610    }
611    return BERR_SUCCESS;
612
613err_event:
614    BKNI_Free(pScheduler->items);
615err_alloc:
616error:
617    return rc;
618}
619
620void
621NEXUS_File_P_Scheduler_Stop(void)
622{
623    unsigned i;
624
625    NEXUS_LockModule();
626    pScheduler->kill_count = 0;
627    /* we need to account for the case when not all threads were initialized */
628    for(i=0;i<pScheduler->workerThreads;i++) {
629        if(!pScheduler->workers[i]->thread) {
630            break;
631        }
632        pScheduler->kill_count++;
633    }
634    for(;;) {
635        unsigned count = pScheduler->kill_count;
636        NEXUS_UnlockModule();
637        if(count==0) {
638            break;
639        }
640        for(i=0;i<count;i++) {
641            BDBG_MSG(("NEXUS_File_P_Scheduler_Stop: waking thread %u out of %u", i, count));
642            BKNI_SetEvent(pScheduler->signal);
643            BKNI_Sleep(10); /* sleep 10 ms to let worker wake-up */
644        }
645        NEXUS_LockModule();
646    }
647    for(i=0;i<pScheduler->workerThreads;i++) {
648        if(!pScheduler->workers[i]->thread) {
649            break;
650        }
651        NEXUS_Thread_Destroy(pScheduler->workers[i]->thread);
652        BKNI_Free(pScheduler->workers[i]);
653    }
654    NEXUS_File_P_CleanupZombies();
655    if(pScheduler->timer) {
656        NEXUS_CancelTimer(pScheduler->timer);
657    }
658    BKNI_DestroyEvent(pScheduler->signal);
659    BKNI_Free(pScheduler->items);
660    return;
661}
662
663void
664NEXUS_File_AsyncMuxWrite(NEXUS_MuxFileIoHandle fd, off_t offset, const NEXUS_FileMuxIoWriteAtom *atoms, size_t atom_count, NEXUS_ModuleHandle module, NEXUS_File_Callback callback, void *cntx)
665{
666    struct NEXUS_File_P_Item *e;
667    BDBG_ASSERT(fd);
668    BDBG_ASSERT(atoms);
669    BDBG_ASSERT(atom_count>0);
670    BDBG_ASSERT(module);
671    BDBG_ASSERT(callback);
672
673    NEXUS_File_P_CleanupZombies();
674
675    e = BLST_S_FIRST(&pScheduler->free);
676    if(e) {
677        BLST_S_REMOVE_HEAD(&pScheduler->free, list);
678        e->ioType = ioType_MuxWrite;
679        e->io.muxWrite.fd = fd;
680        e->io.muxWrite.offset = offset;
681        e->buf = (void *)atoms;
682        e->length = atom_count;
683        e->cntx = cntx;
684        e->module = module;
685        e->callback = callback;
686        e->result = -1;
687        BLST_S_INSERT_HEAD(&pScheduler->queued, e, list);
688        BDBG_MSG_TRACE(("muxWr: queue %#lx", (unsigned long)e));
689        BKNI_SetEvent(pScheduler->signal);
690        return;
691    } else {
692        BDBG_WRN(("NEXUS_File_AsyncMuxWrite: %#lx %#lx %u not enough descriptors", (unsigned long)cntx, (unsigned long)fd, (unsigned)atom_count));
693        callback(cntx, -1);
694        return;
695    }
696    return;
697}
698
699void
700NEXUS_File_AsyncMuxRead(NEXUS_MuxFileIoHandle fd, off_t offset, const NEXUS_FileMuxIoReadAtom *atoms, size_t atom_count, NEXUS_ModuleHandle module, NEXUS_File_Callback callback, void *cntx)
701{
702    struct NEXUS_File_P_Item *e;
703    BDBG_ASSERT(fd);
704    BDBG_ASSERT(atoms);
705    BDBG_ASSERT(atom_count>0);
706    BDBG_ASSERT(module);
707    BDBG_ASSERT(callback);
708
709    NEXUS_File_P_CleanupZombies();
710
711    e = BLST_S_FIRST(&pScheduler->free);
712    if(e) {
713        BLST_S_REMOVE_HEAD(&pScheduler->free, list);
714        e->ioType = ioType_MuxRead;
715        e->io.muxRead.fd = fd;
716        e->io.muxRead.offset = offset;
717        e->buf = (void *)atoms;
718        e->length = atom_count;
719        e->cntx = cntx;
720        e->module = module;
721        e->callback = callback;
722        e->result = -1;
723        BLST_S_INSERT_HEAD(&pScheduler->queued, e, list);
724        BDBG_MSG_TRACE(("muxRd: queue %#lx", (unsigned long)e));
725        BKNI_SetEvent(pScheduler->signal);
726        return;
727    } else {
728        BDBG_WRN(("NEXUS_File_AsyncMuxRead: %#lx %#lx %u not enough descriptors", (unsigned long)cntx, (unsigned long)fd, (unsigned)atom_count));
729        callback(cntx, -1);
730        return;
731    }
732    return;
733}
734
Note: See TracBrowser for help on using the repository browser.