source: svn/trunk/newcon3bcm2_21bu/magnum/syslib/msglib/7552/bmsglib.c @ 52

Last change on this file since 52 was 2, checked in by jglee, 11 years ago

first commit

  • Property svn:executable set to *
File size: 48.8 KB
Line 
1/***************************************************************************
2 *     Copyright (c) 2007-2010, Broadcom Corporation
3 *     All Rights Reserved
4 *     Confidential Property of Broadcom Corporation
5 *
6 *  THIS SOFTWARE MAY ONLY BE USED SUBJECT TO AN EXECUTED SOFTWARE LICENSE
7 *  AGREEMENT  BETWEEN THE USER AND BROADCOM.  YOU HAVE NO RIGHT TO USE OR
8 *  EXPLOIT THIS MATERIAL EXCEPT SUBJECT TO THE TERMS OF SUCH AN AGREEMENT.
9 *
10 * $brcm_Workfile: bmsglib.c $
11 * $brcm_Revision: Hydra_Software_Devel/37 $
12 * $brcm_Date: 4/1/10 2:39p $
13 *
14 * Module Description:
15 *
16 * Revision History:
17 *
18 * $brcm_Log: /magnum/syslib/msglib/7401/bmsglib.c $
19 *
20 * Hydra_Software_Devel/37   4/1/10 2:39p erickson
21 * SW7405-4152: set group->psi_settings.Bank correctly
22 *
23 * Hydra_Software_Devel/36   11/10/09 8:53a gmullen
24 * SW7405-3365: NULL'd p_session pointer if buffer alloc failed
25 *
26 * Hydra_Software_Devel/35   8/19/09 1:01p erickson
27 * PR57768: remove unnecessary BMEM_Heap_FlushCache
28 *
29 * Hydra_Software_Devel/34   7/8/09 2:57p erickson
30 * PR53768: removed BMSGlib_BufferMode. it was never implemented and has
31 * no use.
32 *
33 * Hydra_Software_Devel/33   6/5/09 2:30p erickson
34 * PR55761: limit bufferSize to 512KB
35 *
36 * Hydra_Software_Devel/32   5/8/09 2:36p erickson
37 * PR53390: add BMSGlib_Flush. flush XPT and MSGlib when XPT message error
38 * occurs. this avoids filter group parsing errors due to corruption in
39 * the buffers.
40 *
41 * Hydra_Software_Devel/31   5/6/09 8:22a erickson
42 * PR54867: fix BMSGlib_StopSession for non-PSI filters
43 *
44 * Hydra_Software_Devel/30   4/9/09 1:40p erickson
45 * PR54038: improved ERR message. have BMSGlib_ReadComplete still consume
46 * data if amount_consumed is wrong.
47 *
48 * Hydra_Software_Devel/29   3/24/09 10:01a erickson
49 * PR53516: added BMSGlib_GetStatus
50 *
51 * Hydra_Software_Devel/28   3/18/09 5:13p erickson
52 * PR53214: convert BMSGlib_GetBuffer recursion into goto
53 *
54 * Hydra_Software_Devel/27   3/18/09 4:57p erickson
55 * PR53214: have BMSGlib_ReadComplete automatically handle 4-byte padding.
56 * include a BDBG_WRN so no one's surprised.
57 *
58 * Hydra_Software_Devel/26   3/12/09 5:17p erickson
59 * PR52900: relax check for internally allocated message buffers
60 *
61 * Hydra_Software_Devel/25   3/9/09 11:00a erickson
62 * PR52900: verify that all message filters in a group are using the same
63 * buffer
64 *
65 * Hydra_Software_Devel/24   2/25/09 5:00p erickson
66 * PR52511: remove non-const global data, add proper error checking for
67 * BKNI_Malloc
68 *
69 * Hydra_Software_Devel/23   1/27/09 9:04a erickson
70 * PR51468: make variable static
71 *
72 * Hydra_Software_Devel/22   11/21/08 3:48p erickson
73 * PR48848: dynamically switch message filter group mastership to a
74 * running slave if master stops
75 *
76 * Hydra_Software_Devel/21   6/18/08 1:07p erickson
77 * PR43730: fix warnings by moving param to size_t
78 *
79 * Hydra_Software_Devel/20   5/30/08 6:40p vishk
80 * PR 42684: bsettop_message does not handle PES correctly
81 *
82 * Hydra_Software_Devel/19   5/9/08 1:29p erickson
83 * PR42456: added BMSGlib_Format_ePES_SAVE_ALL. clarified that
84 * maxContiguousMessageSize only applies to PSI messages.
85 *
86 * Hydra_Software_Devel/18   4/14/08 1:16p erickson
87 * PR41730: move user buffer override from Open time to Start time to give
88 * maximum flexibility
89 *
90 * Hydra_Software_Devel/17   3/21/08 11:05a erickson
91 * PR40813: added BMSGlib_SessionParams.bank.
92 *
93 * Hydra_Software_Devel/16   3/10/08 1:27p erickson
94 * PR39836: enforce memory allocation rules for group masters. If the
95 * group master Closes, all other sessions in the group must be Stopped.
96 * Don't allow the group master to restart if there are other sessions
97 * still active in the group.
98 *
99 * Hydra_Software_Devel/15   3/10/08 11:33a erickson
100 * PR39836: added BMSGlib_StopSession to CloseSession
101 *
102 * Hydra_Software_Devel/14   3/10/08 10:08a erickson
103 * PR40321: improve error message if maxContiguousMessageSize is too small
104 *
105 * Hydra_Software_Devel/13   2/26/08 10:29p erickson
106 * PR39781: allow maxContiguousMessageSize == 0 for no data copy on wrap
107 * around
108 *
109 * Hydra_Software_Devel/12   12/5/07 10:33a katrep
110 * PR37217: Added support for 7335.
111 *
112 * Hydra_Software_Devel/11   11/1/07 1:31p erickson
113 * PR36637: use latest transport calls
114 *
115 * Hydra_Software_Devel/10   11/1/07 9:35a erickson
116 * PR36570: added 3563
117 *
118 * Hydra_Software_Devel/9   10/18/07 12:31p erickson
119 * PR35906: rework BMSGlib_StartSession to set rc and dealloc PSI filter
120 * in all failure cases
121 *
122 * Hydra_Software_Devel/8   10/16/07 10:29a jtna
123 * PR35993: Coverity Defect ID:3995 OVERRUN_DYNAMIC
124 *
125 * Hydra_Software_Devel/7   10/15/07 4:11p jtna
126 * PR35993: Coverity Defect ID:3995 OVERRUN_DYNAMIC
127 *
128 * Hydra_Software_Devel/6   9/13/07 1:42p gmullen
129 * PR32868: Changed bmsglib to accept PID channel number, rather than PID
130 * and band combo.
131 *
132 * Hydra_Software_Devel/5   8/7/07 5:22p katrep
133 * PR32868: Fixed message parsing for playback parser.
134 *
135 * Hydra_Software_Devel/4   8/2/07 11:50p vsilyaev
136 * PR 32868: Fixed warnings in release build by improving use of
137 * BERR_TRACE
138 *
139 * Hydra_Software_Devel/3   7/30/07 4:24p erickson
140 * PR32868: remove dependency on tspsi_validate.h
141 *
142 * Hydra_Software_Devel/2   7/26/07 11:11p erickson
143 * PR32868: added more debug and verification
144 *
145 * Hydra_Software_Devel/1   7/26/07 12:35p erickson
146 * PR32868: initial checkin of BMSGlib
147 *
148 ************************************************************************/
149#include "bmsglib.h"
150#include "bkni.h"
151#include "bxpt_interrupt.h"
152
153BDBG_MODULE(msglib);
154
155#if (BCHP_CHIP==7038 && BCHP_VER>=BCHP_VER_C0)
156#define MAX_GROUPS 100
157#define MAX_SESSIONS_PER_GROUP 64
158#else
159#define MAX_GROUPS 50
160#define MAX_SESSIONS_PER_GROUP 32
161#endif
162#define FILTER_SIZE 16
163
164#if BCHP_CHIP != 7038
165#define B_HAS_RAVE 1
166#endif
167
168static BERR_Code BMSGlib_Flush(BMSGlib_Session_Handle session);
169
170BDBG_OBJECT_ID(BMSGlib);
171BDBG_OBJECT_ID(BMSGlib_Session);
172
173struct BMSGlib_Session_Impl {
174    BDBG_OBJECT(BMSGlib_Session)
175    BMSGlib_Handle msglib;
176    BMSGlib_SessionSettings settings;
177    BMSGlib_SessionParams params;
178
179    bool started;
180    unsigned bufferSize; /* actual buffer size in bytes */
181    void *buffer; /* this it the buffer that will be used, whether internally allocated or not */
182
183    struct {
184        void *buffer; /* if not null, MSGlib allocated this and must free it. */
185        unsigned bufferSize;
186    } alloc;
187
188    unsigned PidChannelNum;
189    unsigned FilterNum;
190    unsigned BankNum;
191
192    struct BMSGlib_Group *group; /* this is needed to look up group on api calls */
193    struct BMSGlib_Group *dependent_group; /* this session provided its nexus-allocated buffer for the group.
194        it cannot be closed or switch pids until all dependent sessions are stopped. */
195
196    int group_read_length; /* if > 0, this msgstream's filter has tested true
197        for this msg, so we need to read_complete this total down to 0. This
198        value is not guaranteed to be the msg length because of wrap around. */
199};
200
201/* although params->filterGroup is an optional, to keep the internal impl simple, all BMSGlib_Sessions use BMSGlib_Groups.  */
202struct BMSGlib_Group {
203    BXPT_PsiMessageSettings psi_settings;
204    int refcnt;
205    unsigned PidChannelNum;
206    BMSGlib_Handle msglib;
207
208    /* Each session is a separate Coeff/Mask/Exclusion set */
209    BMSGlib_Session_Handle sessions[MAX_SESSIONS_PER_GROUP]; /* this is needed to look up each stream at isr time */
210    BMSGlib_Session_Handle master_session;
211
212    bool allow_group; /* if true, this BMSGlib_Group can support >1 session */
213    bool just_started;
214
215    int group_read_cnt; /* if > 0, there's a group read in process */
216
217    const void *current_msg; /* pointer to current msg in buffer */
218    size_t length; /* remaining length in buffer */
219    unsigned current_msg_length; /* this is the length of the next message pointed
220        to by group->current_msg, even if the message wraps. */
221
222    unsigned char *contiguous_buffer; /* buffer used for one-message memcpy to guarantee contiguous PSI messages up to a max size */
223    bool use_contiguous_buffer; /* if true, contiguous_buffer is in use and has already been completed with XPT */
224};
225
226struct BMSGlib_Impl {
227    BDBG_OBJECT(BMSGlib)
228    BMSGlib_Settings settings;
229    struct BMSGlib_Group groups[MAX_GROUPS];
230};
231
232
233void BMSGlib_GetDefaultSettings(BMSGlib_Settings *settings)
234{
235    BKNI_Memset(settings, 0, sizeof(*settings));
236}
237
238BERR_Code BMSGlib_Open(BMSGlib_Handle *handle, const BMSGlib_Settings *settings)
239{
240    if (!settings->mem || !settings->xpt) {
241        return BERR_TRACE(BERR_INVALID_PARAMETER);
242    }
243
244    *handle = (BMSGlib_Handle)BKNI_Malloc(sizeof(struct BMSGlib_Impl));
245    if (!*handle) {
246        return BERR_TRACE(BERR_OUT_OF_SYSTEM_MEMORY);
247    }
248    BKNI_Memset(*handle, 0, sizeof(struct BMSGlib_Impl));
249    (*handle)->settings = *settings;
250    BDBG_OBJECT_SET((*handle), BMSGlib);
251
252    return BERR_SUCCESS;
253}
254
255void BMSGlib_Close(BMSGlib_Handle handle)
256{
257    BDBG_OBJECT_DESTROY(handle, BMSGlib);
258    BKNI_Free(handle);
259}
260
261void BMSGlib_GetDefaultSessionSettings(BMSGlib_Handle handle, BMSGlib_SessionSettings *settings)
262{
263    BDBG_OBJECT_ASSERT(handle, BMSGlib);
264    BKNI_Memset(settings, 0, sizeof(*settings));
265    settings->bufferSize = 4*1024;
266    settings->maxContiguousMessageSize = 4*1024;
267}
268
269BERR_Code BMSGlib_OpenSession(BMSGlib_Handle msglib, BMSGlib_Session_Handle *p_session, const BMSGlib_SessionSettings *settings)
270{
271    BMSGlib_Session_Handle session;
272
273    BDBG_OBJECT_ASSERT(msglib, BMSGlib);
274
275    /* callbacks are not required. the app could decide to poll. */
276    /* bufferSize is not required. the app could pass a buffer in at Start time. check then. */
277
278    session = *p_session = (BMSGlib_Session_Handle)BKNI_Malloc(sizeof(*session));
279    if (!session) {
280        return BERR_TRACE(BERR_OUT_OF_SYSTEM_MEMORY);
281    }
282    BKNI_Memset(session, 0, sizeof(*session));
283    BDBG_OBJECT_SET(session, BMSGlib_Session);
284    session->msglib = msglib;
285    session->settings = *settings;
286
287    if (settings->bufferSize) {
288        session->alloc.bufferSize = settings->bufferSize;
289        session->alloc.buffer = BMEM_AllocAligned(session->msglib->settings.mem, session->alloc.bufferSize, 10, 0);
290        BDBG_MSG(("Allocated %#x, %d bytes", session->alloc.buffer, session->alloc.bufferSize));
291        if (!session->alloc.buffer) {
292            BKNI_Free(session);
293            *p_session = NULL;
294            return BERR_TRACE(BERR_OUT_OF_DEVICE_MEMORY);
295        }
296    }
297
298    return BERR_SUCCESS;
299}
300
301void BMSGlib_CloseSession(BMSGlib_Session_Handle session)
302{
303    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
304    BMSGlib_StopSession(session);
305
306    if (session->dependent_group) {
307        int i;
308        struct BMSGlib_Group *group = session->dependent_group;
309        /* We have to stop the others to avoid inevitable and unrecoverable memory corruption. */
310        BDBG_WRN(("You are closing the group's master session (%p). All other sessions in the group must be stopped.", session));
311
312        for (i=0;i<MAX_SESSIONS_PER_GROUP;i++) {
313            if (group->sessions[i]) {
314                BMSGlib_StopSession(group->sessions[i]);
315            }
316        }
317        BDBG_ASSERT(!session->dependent_group);
318    }
319
320    if (session->alloc.buffer) {
321        BMEM_Free(session->msglib->settings.mem, session->alloc.buffer);
322    }
323    BDBG_OBJECT_DESTROY(session, BMSGlib_Session);
324    BKNI_Free(session);
325}
326
327void BMSGlib_GetDefaultSessionParams(BMSGlib_Session_Handle session, BMSGlib_SessionParams *params)
328{
329    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
330    BKNI_Memset(params, 0, sizeof(*params));
331    BKNI_Memset(params->filter.Mask, 0xFF, sizeof(params->filter.Mask));
332    BKNI_Memset(params->filter.Exclusion, 0xFF, sizeof(params->filter.Exclusion));
333/* for RAVE platforms, multiple pid channels on the same band and pid are not
334allowed. an easy way to avoid this is to use filter groups. */
335#if B_HAS_RAVE
336    params->filterGroup = true;
337#endif
338    params->bank = -1; /* automatic selection */
339}
340
341static void BMSGlib_p_dataReady_isr(void *param1, int param2)
342{
343    int i;
344    struct BMSGlib_Group *group = (struct BMSGlib_Group *)param1;
345    BSTD_UNUSED(param2);
346    /* disable until the next BXPT_GetBuffer call */
347    BXPT_Interrupt_DisableMessageInt_isr(group->msglib->settings.xpt, group->PidChannelNum);
348
349    /* everybody gets a callback. we can't know if the first message in
350    the buffer is for a given session at isr time. */
351    for (i=0;i<MAX_SESSIONS_PER_GROUP;i++) {
352        BMSGlib_Session_Handle session = group->sessions[i];
353        if (session && session->settings.dataReadyCallback_isr) {
354            (*session->settings.dataReadyCallback_isr)(session->settings.callbackContext);
355        }
356    }
357}
358
359static void BMSGlib_p_overflow_callback_isr(void *param1, int param2)
360{
361    int i;
362    struct BMSGlib_Group *group = (struct BMSGlib_Group *)param1;
363    BSTD_UNUSED(param2);
364    /* disable until the next BXPT_GetBuffer call */
365    BXPT_Interrupt_DisableMessageOverflowInt_isr(group->msglib->settings.xpt, group->PidChannelNum);
366
367    for (i=0;i<MAX_SESSIONS_PER_GROUP;i++) {
368        BMSGlib_Session_Handle session = group->sessions[i];
369        if (session && session->settings.overflowCallback_isr) {
370            (*session->settings.overflowCallback_isr)(session->settings.callbackContext);
371        }
372    }
373}
374
375static BERR_Code BMSGlib_p_enable_interrupt(BMSGlib_Session_Handle session, bool enabled)
376{
377    BERR_Code rc;
378
379    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
380    if (session->settings.dataReadyCallback_isr) {
381        if (enabled) {
382            rc = BXPT_Interrupt_EnableMessageInt(session->msglib->settings.xpt, session->PidChannelNum,
383                BMSGlib_p_dataReady_isr, session->group, 0);
384            if (rc) return BERR_TRACE(rc);
385        }
386        else {
387            rc = BXPT_Interrupt_DisableMessageInt(session->msglib->settings.xpt, session->PidChannelNum);
388            if (rc) return BERR_TRACE(rc);
389        }
390    }
391    if (session->settings.overflowCallback_isr) {
392        if (enabled) {
393            rc = BXPT_Interrupt_EnableMessageOverflowInt(session->msglib->settings.xpt, session->PidChannelNum,
394                BMSGlib_p_overflow_callback_isr, session->group, 0);
395            if (rc) return BERR_TRACE(rc);
396        }
397        else {
398            rc = BXPT_Interrupt_DisableMessageOverflowInt(session->msglib->settings.xpt, session->PidChannelNum);
399            if (rc) return BERR_TRACE(rc);
400        }
401    }
402    return BERR_SUCCESS;
403}
404
405/**
406Returns used or unused group. Caller should distinguish using refcnt.
407If refcnt is 0, caller is responsible for populating psi_settings.
408Caller is responsible for inc/dec of refcnt.
409**/
410
411static struct BMSGlib_Group *BMSGlib_p_find_group(BMSGlib_Handle msglib, bool allow_group, unsigned PidChannel )
412{
413    int i;
414    struct BMSGlib_Group *unused_group = NULL;
415
416    /* first, try to find one that's already used */
417    for (i=0;i<MAX_GROUPS;i++) {
418        struct BMSGlib_Group *group = &msglib->groups[i];
419        if (group->refcnt && group->allow_group && allow_group) {
420            /* this group is already in use. check if we can join their party. */
421            if (group->PidChannelNum == PidChannel )
422            {
423                return group;
424            }
425        }
426        else if (!group->refcnt && !unused_group) {
427            /* find one unused group in case we don't find one we can join */
428            unused_group = group;
429        }
430    }
431    if (!unused_group) {
432        BDBG_ERR(("No more message filter groups available"));
433    }
434    else {
435        unused_group->allow_group = allow_group;
436    }
437    return unused_group;
438}
439
440static BERR_Code BMSGlib_p_add_stream(struct BMSGlib_Group *group, BMSGlib_Session_Handle session)
441{
442    int i;
443    BERR_Code rc = BERR_SUCCESS;
444
445    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
446    /* No critical section needed here. The danger is that we add a stream to a group
447    but don't get a callback because of a race condition. But we always request
448    a callback after adding subsequent group members at the end of BMSGlib_StartSession.
449    See there for explanation. */
450    for (i=0;i<MAX_SESSIONS_PER_GROUP;i++) {
451        if (!group->sessions[i]) {
452            if (group->refcnt == 0) {
453                BDBG_ASSERT(!group->contiguous_buffer);
454                if (session->settings.maxContiguousMessageSize) {
455                    group->contiguous_buffer = BMEM_AllocAligned(session->msglib->settings.mem, session->settings.maxContiguousMessageSize, 10, 0 );
456                    BDBG_MSG(("Allocated maxContiguousMessage buffer %#x, %d bytes", group->contiguous_buffer, session->settings.maxContiguousMessageSize));
457                    if (!group->contiguous_buffer) {
458                        rc = BERR_TRACE(BERR_OUT_OF_DEVICE_MEMORY);
459                        goto done;
460                    }
461                }
462                BDBG_ASSERT(!group->master_session);
463                group->master_session = session;
464            }
465            /* hook it up */
466            group->msglib = session->msglib;
467            group->sessions[i] = session;
468            group->refcnt++;
469            session->group = group;
470            BDBG_MSG(("add to group %p (refcnt %d)", group, group->refcnt));
471            goto done;
472        }
473    }
474    BDBG_ERR(("No more filter groups available"));
475    rc = BERR_TRACE(BERR_UNKNOWN);
476done:
477    return rc;
478}
479
480static void BMSGlib_p_remove_stream(struct BMSGlib_Group *group, BMSGlib_Session_Handle session)
481{
482    int i;
483
484    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
485    for (i=0;i<MAX_SESSIONS_PER_GROUP;i++) {
486        if (group->sessions[i] == session) {
487            group->sessions[i] = NULL;
488            group->refcnt--;
489
490            if (group->refcnt == 0) {
491                /* the grouping can be undone */
492                if (group->contiguous_buffer) {
493                    BMEM_Free(session->msglib->settings.mem, group->contiguous_buffer);
494                    group->contiguous_buffer = NULL;
495                }
496
497                /* disconnect the master session. it can be safely closed now. */
498                BDBG_ASSERT(group->master_session);
499                group->master_session->dependent_group = NULL;
500                group->master_session = NULL;
501                group->current_msg = NULL;
502            }
503            else {
504                /* throw the mastership to another session in the group if the session doesn't own the buffer
505                   and the new master doesn't own the buffer. if any session in the group owns the buffer,
506                   the whole system is too inflexible. */
507                if (group->master_session == session && !session->alloc.buffer) {
508                    for (i=0;i<MAX_SESSIONS_PER_GROUP;i++) {
509                        if (group->sessions[i] && !group->sessions[i]->alloc.buffer) {
510                            BDBG_MSG(("switching group master from %p to %p", group->master_session, group->sessions[i]));
511                            group->master_session = group->sessions[i];
512                            break;
513                        }
514                    }
515                }
516            }
517
518            session->group = NULL;
519            BDBG_MSG(("remove from group %p (refcnt %d)", group, group->refcnt));
520            return;
521        }
522    }
523    BDBG_ASSERT(0);
524}
525
526static bool BMSGlib_P_TS_Filter_Compare(const uint8_t *msg, const uint8_t *inclMask, const uint8_t *exclMask,
527    const uint8_t *coef, size_t filterSize )
528{
529    bool inclResult = true;
530    bool exclResult = true;
531    bool exclEnabled = false;
532    size_t i;
533    unsigned message_index;
534
535    for( i = 0, message_index = 0; message_index < filterSize; i++, message_index++ )
536    {
537        /* BXPT_Filter skips the 2nd byte */
538        if (message_index == 2) {
539            message_index++;
540        }
541        if( (inclMask[i] | coef[i]) != (inclMask[i] | msg[message_index]) )
542        {
543            inclResult = false;
544            break;
545        }
546
547        if( exclEnabled == false && exclMask[i] != 0xFF )
548        {
549            exclEnabled = true;
550            exclResult = false;
551        }
552
553        if( (~exclMask[i] & coef[i]) != (~exclMask[i] & msg[message_index]) )
554        {
555            exclResult = true;
556        }
557    }
558
559#if 0
560/* for debug to make sure the logic is right */
561    if( !(inclResult && exclResult) )
562    {
563        BKNI_Printf("Incl: ");
564        for( i = 0; i < filterSize; i++ )
565        {
566            BKNI_Printf("%02x ", inclMask[i]);
567        }
568        BKNI_Printf("\nExcl: ");
569        for( i = 0; i < filterSize; i++ )
570        {
571            BKNI_Printf("%02x ", exclMask[i]);
572        }
573        BKNI_Printf("\nCoef: ");
574        for( i = 0; i < filterSize; i++ )
575        {
576            BKNI_Printf("%02x ", coef[i]);
577        }
578        BKNI_Printf("\n Msg: ");
579        for( i = 0; i < filterSize; i++ )
580        {
581            BKNI_Printf("%02x ", msg[i]);
582        }
583        BKNI_Printf("\n");
584    }
585#endif
586
587    return inclResult && exclResult;
588}
589
590#define TS_READ_16( buf ) ((uint16_t)((buf)[0]<<8|(buf)[1]))
591#define TS_PSI_SECTION_LENGTH_OFFSET                1
592#define TS_PSI_GET_SECTION_LENGTH( buf )    (uint16_t)(TS_READ_16( &(buf)[TS_PSI_SECTION_LENGTH_OFFSET] ) & 0x0FFF)
593
594static int TS_Get_Message_Size(const uint8_t *msg)
595{
596    size_t message_size = TS_PSI_GET_SECTION_LENGTH(msg) + 3;
597    /*
598    The transport hardware automatically pads messages to be 32 bit aligned.
599    Therefore, the message size must be increased to take this into account.
600    */
601    if( message_size%4 ) {
602        message_size += 4-(message_size%4);
603    }
604    return message_size;
605}
606
607static bool TS_Validate_Size( const uint8_t *msg, size_t size)
608{
609    size_t current_size = 0;
610    while (current_size + TS_PSI_SECTION_LENGTH_OFFSET + 1 < size) {
611        current_size += TS_Get_Message_Size(&msg[current_size]);
612    }
613    /* In the end, it must be exactly equal */
614    return current_size == size;
615}
616
617/**
618NOTE: There's another way we could have done this. There's an XPT option to append
619every message with a filter bitmask. This is a simpler and more explicit way of checking
620a filter match. However, the above could would have to read the whole message
621in order to perform the test. Plus, we'd have to pass these filter bitmask's out
622to the user (breaking existing apps), or have every BMSGlib_GetBuffer skip over
623them (which won't work well at high bitrates), or have a flag to indicate that
624a BMSGlib_stream will be a group before the first BMSGlib_start (which changes the
625public API and probably leads to user errors). So, it seems the best option is
626to use this soft filter test, which should work.
627**/
628
629/* Examine the next message in the buffer for group processing */
630static BERR_Code BMSGlib_process_next_message(BMSGlib_Session_Handle session)
631{
632    struct BMSGlib_Group *group = session->group;
633    size_t remainder = 0;
634    int i,filterSize;
635
636    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
637
638    /* we should have data read to be processed */
639    BDBG_ASSERT(group->current_msg && group->length);
640    /* we should not be in the middle of a group read */
641    BDBG_ASSERT(group->group_read_cnt == 0);
642
643    /* We know group->length is large enough to at least read the size
644    because of XPT's 4 byte alignment */
645    BDBG_ASSERT(group->length >= 4);
646    group->current_msg_length = TS_Get_Message_Size(group->current_msg);
647
648    /* determine if the message wraps */
649    if (group->length < group->current_msg_length) {
650        bool MoreDataAvailable;
651        BERR_Code rc;
652        uint8_t *read_ahead_pointer;
653        size_t read_ahead_length;
654
655        BDBG_MSG(("group message wraps around end of buffer, %d bytes, msglen=%d", group->length, group->current_msg_length));
656
657        if (group->current_msg_length > session->settings.maxContiguousMessageSize) {
658            /* options for fixing this: increase maxContiguousMessageSize to accomodate max, or set maxContiguousMessageSize to 0
659            to bypass this copy and require app to piece together messages. */
660            BDBG_ERR(("%d byte message received which is larger than %d maxContiguousMessageSize. It must be discarded.",
661                group->current_msg_length, session->settings.maxContiguousMessageSize));
662            return -1;
663        }
664        if (!group->contiguous_buffer) {
665            /* this should only be NULL if maxContiguousMessageSize is zero */
666            return -1;
667        }
668
669        remainder = group->current_msg_length - group->length;
670
671        /* Copy the first part of the message into the buffer */
672        BKNI_Memcpy(group->contiguous_buffer, group->current_msg, group->length);
673
674        /* This is the trick part, we have to complete the transaction now in order to get more from XPT. */
675        rc = BXPT_UpdateReadOffset( session->msglib->settings.xpt, session->PidChannelNum, group->length);
676        if (rc) {
677            /* flush for error recovery, but still pass on the error code */
678            BMSGlib_Flush(session);
679            return BERR_TRACE(rc);
680        }
681
682        /* The early BXPT_CheckBuffer is no problem because it's non-destructive. */
683        rc = BXPT_CheckBuffer( session->msglib->settings.xpt, session->PidChannelNum,
684            (uint8_t **)&read_ahead_pointer, &read_ahead_length, &MoreDataAvailable);
685        if (rc) return BERR_TRACE(rc);
686
687        /* XPT should only return whole messages, so we should not come up short */
688        if (read_ahead_length < remainder){
689            /* PR 53390 - could be HW buffer corruption. absorb the error. */
690            BDBG_ERR(("read bad message length (got %d, expecting %d) at wrap around", read_ahead_length, remainder));
691            BMSGlib_Flush(session);
692            return 0;
693        }
694
695        BDBG_ASSERT(read_ahead_pointer);
696
697        /* Copy the second part of the message to the end of the buffer. Don't copy more than the message. */
698        BKNI_Memcpy(&group->contiguous_buffer[group->length], read_ahead_pointer, remainder);
699
700        /* Complete this part of the read too */
701        rc = BXPT_UpdateReadOffset( session->msglib->settings.xpt, session->PidChannelNum, remainder);
702        if (rc) {
703            /* flush for error recovery, but still pass on the error code */
704            BMSGlib_Flush(session);
705            return BERR_TRACE(rc);
706        }
707
708        group->use_contiguous_buffer = true;
709        group->current_msg = group->contiguous_buffer;
710    }
711
712    if( group->current_msg_length < FILTER_SIZE ) filterSize = group->current_msg_length;
713    else filterSize = FILTER_SIZE;
714
715    /* compare message with each session's filter. */
716    for (i=0;i<MAX_SESSIONS_PER_GROUP;i++) {
717        BMSGlib_Session_Handle session = group->sessions[i];
718        if (session) {
719            if (BMSGlib_P_TS_Filter_Compare((const uint8_t *)group->current_msg,
720                session->params.filter.Mask, session->params.filter.Exclusion,
721                session->params.filter.Coeficient, filterSize))
722            {
723                session->group_read_length = group->current_msg_length;
724                group->group_read_cnt++;
725                BDBG_MSG(("found message for %p, group_read_length=%d, first_byte=%02x",
726                    session, session->group_read_length, *(const uint8_t *)group->current_msg));
727            }
728            else {
729                session->group_read_length = 0;
730            }
731        }
732    }
733
734    return BERR_SUCCESS;
735}
736
737static void BMSGlib_p_advance_group_buffer(struct BMSGlib_Group *group)
738{
739    /* If buffer is NULL, then we're waiting on XPT
740    If group_read_cnt != 0, then we're still waiting on read_complete's
741    Otherwise, we can advance and send callbacks */
742    if (!group->current_msg || group->group_read_cnt)
743        return;
744
745    /* This assumes group complete when the whole message is processed */
746    group->length -= group->current_msg_length;
747
748    if (!group->length)
749        group->current_msg = NULL;
750    else
751        group->current_msg = (const char *)group->current_msg + group->current_msg_length;
752
753    /* Fire the callbacks. If we have more messages to process
754    in the buffer, this keeps the sessions coming back for more.
755    However, even if we're done with the buffer, we may have bypassed the
756    BXPT_GetBuffer and so we have to do this. This means the group reads
757    will always end with callbacks that result in no reads. That's the way it is. */
758    BKNI_EnterCriticalSection();
759    BMSGlib_p_dataReady_isr(group, 0);
760    BKNI_LeaveCriticalSection();
761
762    /* TODO: performance optimization:
763    if (group->current_msg), we could call BMSGlib_GetBuffer internally,
764    and then only callback those who will receive the next message. The current
765    impl won't hurt. */
766}
767
768BERR_Code BMSGlib_StartSession(BMSGlib_Session_Handle session, const BMSGlib_SessionParams *params)
769{
770    int i;
771    struct BMSGlib_Group *group;
772    BERR_Code rc = BERR_SUCCESS;
773    BXPT_Handle xpt = session->msglib->settings.xpt;
774    bool first_session;
775    bool allocFilter = false;
776
777    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
778    if (session->started) {
779        BMSGlib_StopSession(session);
780    }
781
782    if (session->dependent_group) {
783        BDBG_ERR(("This session is the master of filter group which is current active. You must stop all other members of the group before restarting this session."));
784        return BERR_TRACE(-1);
785    }
786
787    session->params = *params;
788
789    if (params->buffer) {
790        session->buffer = params->buffer;
791        session->bufferSize = params->bufferSize;
792    }
793    else {
794        session->buffer = session->alloc.buffer;
795        session->bufferSize = session->alloc.bufferSize;
796    }
797    /* verify we have a buffer that's usable */
798    if (!session->buffer || session->bufferSize<1024 || ((unsigned)session->buffer % 1024)) {
799        return BERR_TRACE(BERR_INVALID_PARAMETER);
800    }
801
802    BDBG_MSG(( "BMSGlib_StartSession %p, PID channel %u", session, params->PidChannel ));
803
804    group = BMSGlib_p_find_group( session->msglib, params->filterGroup, params->PidChannel );
805    if (!group) return BERR_TRACE(BERR_OUT_OF_SYSTEM_MEMORY);
806
807    first_session = (group->refcnt == 0);
808
809    if (first_session) {
810        BXPT_MessageBufferSize bufferSizeEnum;
811        unsigned bufferSize, temp;
812
813        /* calculate actual bufferSize as a log of 2, up to a max of 512KB */
814        bufferSizeEnum = BXPT_MessageBufferSize_e1kB;
815        bufferSize = 1024;
816        temp = session->bufferSize / 1024;
817        while (temp > 1 && bufferSize < 512*1024) {
818            temp /= 2; /* throw away remainder */
819            bufferSizeEnum++;
820            bufferSize *= 2;
821        }
822        BDBG_ASSERT(bufferSizeEnum <= BXPT_MessageBufferSize_e512kB);
823        if (bufferSize < session->bufferSize) {
824            BDBG_WRN(("only %d out of %d bytes of BMSGlib buffer will be used.", bufferSize, session->bufferSize));
825        }
826
827        BKNI_Memset(group->sessions, 0, sizeof(group->sessions));
828
829        /* If Nexus allocated the buffer, then Nexus must ensure that all uses of the buffer are correct. */
830        if (session->alloc.buffer) {
831            session->dependent_group = group;
832        }
833
834        session->PidChannelNum = params->PidChannel;
835        rc = BXPT_SetPidChannelBuffer( xpt, session->PidChannelNum, session->buffer, bufferSizeEnum);
836        if (rc) { rc=BERR_TRACE(rc); goto error;}
837
838        group->PidChannelNum = session->PidChannelNum;
839    }
840    else {
841        /* TODO: we don't use the buffers on secondary filters. we can't avoid the alloc (done at Open time), but
842        maybe we can WRN the user that the memory is not used so they can alloc less. */
843
844        if( params->format == BMSGlib_Format_ePSI ) {
845            BDBG_ASSERT(session->params.filterGroup);
846            session->PidChannelNum = group->PidChannelNum;
847
848            if (!session->settings.dataReadyCallback_isr) {
849                BDBG_ERR(("dataReadyCallback_isr is required for filtergroups"));
850                rc = BERR_TRACE(BERR_INVALID_PARAMETER);
851                goto error;
852            }
853        }
854        else {
855            BDBG_ERR(("Don't allow message groups for non-PSI filtering"));
856            rc = BERR_TRACE(BERR_INVALID_PARAMETER);
857            goto error;
858        }
859
860        if (params->buffer && params->buffer != group->master_session->buffer) {
861            BDBG_ERR(("All message filters in a group must use the same buffer."));
862            rc = BERR_TRACE(BERR_INVALID_PARAMETER);
863            goto error;
864        }
865    }
866
867    /* We need to add to group before enabling it to prevent race
868    conditions. We've already capture refcnt==0 before this. */
869    rc = BMSGlib_p_add_stream(group, session);
870    if (rc) {rc=BERR_TRACE(rc);goto error;}
871    BDBG_MSG(("add session to group %p, refcnt %d", group, group->refcnt));
872
873    if (session->params.format == BMSGlib_Format_ePSI) {
874        if (session->params.bank != -1) {
875            /* explicit filter selection */
876            rc = BXPT_AllocPSIFilter( xpt, session->params.bank, &session->FilterNum );
877            if (rc == BERR_SUCCESS){
878                session->BankNum = session->params.bank;
879                allocFilter = true;
880            }
881        }
882        else if (!first_session) {
883            /* HW requires  all filters in a group to be on the same bank. */
884            BDBG_ASSERT(group->refcnt && group->master_session);
885            rc = BXPT_AllocPSIFilter( xpt, group->master_session->BankNum, &session->FilterNum );
886            if (rc == BERR_SUCCESS){
887                session->BankNum = group->master_session->BankNum;
888                allocFilter = true;
889            }
890        }
891        else {
892            /* For easy application code, we search for an available bank. */
893            for(i=0; i<BXPT_P_MAX_FILTER_BANKS; i++){
894                rc = BXPT_AllocPSIFilter( xpt, i, &session->FilterNum );
895                if (rc == BERR_SUCCESS){
896                    session->BankNum = i;
897                    allocFilter = true;
898                    break;
899                }
900            }
901        }
902        if (rc) {rc=BERR_TRACE(rc);goto error;}
903
904        rc = BXPT_SetFilter( xpt, session->BankNum, session->FilterNum, &session->params.filter);
905        if (rc) {rc=BERR_TRACE(rc);goto error;}
906
907        if (session->params.filterGroup) {
908#if BCHP_CHIP == 7038
909            rc = BXPT_AddFilterToGroup(xpt, session->FilterNum, &group->psi_settings);
910#else
911            rc = BXPT_AddFilterToGroup(xpt, group->PidChannelNum, session->FilterNum, &group->psi_settings);
912#endif
913            if (rc) {rc=BERR_TRACE(rc);goto error;}
914        }
915    }
916
917    if (first_session) {
918        unsigned Pid, ParserBand;
919        bool IsParserAPlayback;
920
921        group->psi_settings.Bank = session->BankNum;
922        group->psi_settings.FilterEnableMask = 1UL<<session->FilterNum;
923        group->psi_settings.CrcDisable = params->psiCrcDisabled;
924        group->just_started = true;
925
926        rc = BMSGlib_p_enable_interrupt(session, true);
927        if (rc) {rc=BERR_TRACE(rc);goto error;}
928
929        rc = BXPT_GetPidChannelConfig( xpt, session->PidChannelNum, &Pid, &ParserBand, &IsParserAPlayback );
930        if (rc) {rc=BERR_TRACE(rc);goto error;}
931
932        switch(session->params.format) {
933        default:
934        case BMSGlib_Format_ePES:
935        case BMSGlib_Format_eTS:
936        {
937            BXPT_PidChannelRecordSettings ChanSettings;
938
939            BKNI_Memset(&ChanSettings, 0, sizeof(ChanSettings)); /* no XPT init func */
940            ChanSettings.Pid  = Pid;
941            ChanSettings.Band = IsParserAPlayback ? BXPT_PB_PARSER( ParserBand ) : ParserBand;
942            ChanSettings.SaveAllCountType = 0;
943            ChanSettings.SaveAllCount = 10;
944            ChanSettings.ByteAlign = false;
945            switch (session->params.format) {
946            case BMSGlib_Format_eTS:
947                ChanSettings.RecordType = BXPT_SingleChannelRecordType_ePacketSaveAll;
948                break;
949            case BMSGlib_Format_ePES:
950                ChanSettings.RecordType = BXPT_SingleChannelRecordType_ePes;
951                break;
952            default:
953                ChanSettings.RecordType = BXPT_SingleChannelRecordType_ePesSaveAll;
954                break;
955            }
956
957            rc = BXPT_StartPidChannelRecord( xpt, session->PidChannelNum, &ChanSettings );
958            if (rc) {rc=BERR_TRACE(rc); goto error; }
959            break;
960        }
961        case BMSGlib_Format_ePSI:
962            {
963#if B_HAS_RAVE
964                bool override=false;
965                BXPT_PidPsiConfig psiConfig;
966
967                rc = BXPT_GetPidChannelPsiSettings( xpt, session->PidChannelNum, &override, &psiConfig );
968                if (rc) {rc=BERR_TRACE(rc);goto error;}
969
970                psiConfig.PsfCrcDis = params->psfCrcDisabled;
971                override = true;
972
973                rc = BXPT_SetPidChannelPsiSettings( xpt, session->PidChannelNum, override, &psiConfig );
974                if (rc) {rc=BERR_TRACE(rc);goto error;}
975#else
976                if( params->psfCrcDisabled ) {
977                    BDBG_WRN(("Setting psfCrcDisabled is not supported yet" ));
978                }
979#endif
980                group->psi_settings.Pid = Pid;
981                group->psi_settings.Band = IsParserAPlayback ? BXPT_PB_PARSER( ParserBand ) : ParserBand;
982                rc = BXPT_StartPsiMessageCapture( xpt, session->PidChannelNum, &group->psi_settings );
983                if (rc) {rc=BERR_TRACE(rc);goto error;}
984            }
985            break;
986        }
987    }
988
989    session->started = true;
990
991    /* If this session is part of a group, and if this group already has
992    data pending, we need to make sure this session gets a callback. Otherwise
993    we'll be waiting for this session's read_complete, and it may be waiting
994    for a callback. */
995    if (session->group->current_msg && session->settings.dataReadyCallback_isr) {
996        BKNI_EnterCriticalSection();
997        (*session->settings.dataReadyCallback_isr)(session->settings.callbackContext);
998        BKNI_LeaveCriticalSection();
999    }
1000
1001    return BERR_SUCCESS;
1002
1003error:
1004    if (allocFilter)  {
1005        BXPT_FreePSIFilter( xpt, session->BankNum, session->FilterNum );
1006    }
1007    BDBG_ASSERT(rc); /* if the code gets here, it must be a failure */
1008    return rc;
1009}
1010
1011void BMSGlib_StopSession(BMSGlib_Session_Handle session)
1012{
1013    struct BMSGlib_Group *group = session->group;
1014    BXPT_Handle xpt = session->msglib->settings.xpt;
1015
1016    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
1017    if (session->started == false) {
1018        return;
1019    }
1020    BDBG_ASSERT(group);
1021
1022    if (session->group_read_length) {
1023        group->group_read_cnt--;
1024        session->group_read_length = 0;
1025        /* if this was the last one we're waiting on, we need to advance */
1026        BMSGlib_p_advance_group_buffer(group);
1027    }
1028
1029    BMSGlib_p_remove_stream(group, session);
1030
1031    if (group->refcnt == 0) {
1032        BMSGlib_p_enable_interrupt(session, false);
1033        if (session->params.format == BMSGlib_Format_ePSI) {
1034            BXPT_StopPsiMessageCapture(xpt, session->PidChannelNum );
1035        }
1036        else {
1037            BXPT_StopPidChannelRecord(xpt, session->PidChannelNum );
1038        }
1039    }
1040
1041    if (session->params.format == BMSGlib_Format_ePSI) {
1042        if (session->params.filterGroup) {
1043            BXPT_RemoveFilterFromGroup(xpt, session->FilterNum, &group->psi_settings);
1044        }
1045
1046        BXPT_FreePSIFilter(xpt, session->BankNum, session->FilterNum);
1047    }
1048
1049    session->started = false;
1050}
1051
1052/* flush everything that's in the buffer, regardless of group. everything starts over. */
1053static BERR_Code BMSGlib_Flush(BMSGlib_Session_Handle session)
1054{
1055    bool MoreDataAvailable;
1056    unsigned length;
1057    unsigned i;
1058
1059    BDBG_MSG(("BMSGlib_Flush %p", session));
1060    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
1061
1062    do {
1063        uint8_t *buffer;
1064        BERR_Code rc;
1065
1066        rc = BXPT_CheckBuffer( session->msglib->settings.xpt, session->PidChannelNum, &buffer, &length, &MoreDataAvailable);
1067        if (rc) return BERR_TRACE(rc);
1068
1069        /* unconditionally consume everything */
1070        if (length) {
1071            rc = BXPT_UpdateReadOffset( session->msglib->settings.xpt, session->PidChannelNum, length);
1072            if (rc) return BERR_TRACE(rc);
1073        }
1074    } while (MoreDataAvailable || length);
1075
1076    session->group->group_read_cnt = 0;
1077    session->group->current_msg = NULL;
1078    session->group->length = 0;
1079    session->group->current_msg_length = 0;
1080
1081    for (i=0;i<MAX_SESSIONS_PER_GROUP;i++) {
1082        if (session->group->sessions[i]) {
1083            session->group->sessions[i]->group_read_length = 0;
1084        }
1085    }
1086
1087    return 0;
1088}
1089
1090BERR_Code BMSGlib_GetBuffer(BMSGlib_Session_Handle session, const void **buffer, size_t *length)
1091{
1092    bool MoreDataAvailable = false;
1093    int retryCount = 0;
1094    struct BMSGlib_Group *group = session->group;
1095    void *buf_cached;
1096    BERR_Code rc;
1097    BMEM_Heap_Handle mem;
1098
1099    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
1100    BDBG_ASSERT(buffer);
1101    BDBG_ASSERT(length);
1102    mem = session->msglib->settings.mem;
1103
1104    if (session->started == false) {
1105        return BERR_UNKNOWN;
1106    }
1107    BDBG_ASSERT(group);
1108
1109    if(session->params.format != BMSGlib_Format_ePSI) {
1110        /* shortcut for PES/TS record */
1111        rc = BXPT_CheckBuffer( session->msglib->settings.xpt, session->PidChannelNum, (uint8_t **)buffer, length, &MoreDataAvailable);
1112        if (rc) {return BERR_TRACE(rc);}
1113
1114        if (*buffer) {
1115            rc = BMEM_Heap_ConvertAddressToCached(mem, (void*)*buffer, &buf_cached);
1116            if (rc) {return BERR_TRACE(rc);}
1117            *buffer = buf_cached;
1118            if (*length) {
1119                BMEM_Heap_FlushCache(mem, buf_cached, *length);
1120            }
1121        }
1122        /* reenable the callback */
1123        BMSGlib_p_enable_interrupt(session, true);
1124
1125        return BERR_SUCCESS;
1126    }
1127
1128try_next_message:
1129    /* If we're in a group read, don't expand the buffer. We'll have to finish
1130    the read */
1131    if (!group->current_msg) {
1132        rc = BXPT_CheckBuffer( session->msglib->settings.xpt, session->PidChannelNum, (uint8_t **)buffer,
1133            length, &MoreDataAvailable );
1134        if (rc) {return BERR_TRACE(rc);}
1135
1136        if (*buffer) {
1137            rc = BMEM_Heap_ConvertAddressToCached(mem, (void*)*buffer, &buf_cached);
1138            if (rc) {return BERR_TRACE(rc);}
1139            *buffer = buf_cached;
1140            if (*length) {
1141                BMEM_Heap_FlushCache(mem, buf_cached, *length);
1142            }
1143        }
1144
1145        /* Because of potentially bad filtering, we have to validate the first message */
1146        if (*buffer && *length) {
1147            /* If we're the only entry in this group, then apply the soft filter to the first one.
1148            Otherwise, we'll apply a soft filter every time, so there's no need. */
1149            if (group->just_started && group->refcnt == 1) {
1150                if (!BMSGlib_P_TS_Filter_Compare(*buffer, session->params.filter.Mask,
1151                        session->params.filter.Exclusion, session->params.filter.Coeficient,
1152                        sizeof(session->params.filter.Mask))
1153                    ||
1154                    !TS_Validate_Size(*buffer, *length))
1155                {
1156                    /* clear the buffer */
1157                    BDBG_WRN(("Bad message discarded len=%d", *length));
1158                    BMSGlib_ReadComplete(session, *length);
1159                    *length = 0;
1160                }
1161            }
1162            group->just_started = false;
1163        }
1164        /* reenable the callback */
1165        BMSGlib_p_enable_interrupt(session, true);
1166        if (!(*buffer && *length)) {
1167            goto done;
1168        }
1169    }
1170
1171    /* if maxContiguousMessageSize is set, then we must call BMSGlib_process_next_message in order to send
1172    whole messages on wrap around
1173    */
1174    if (session->settings.maxContiguousMessageSize) {
1175        /* this is the first pass in the group read for the entire buffer */
1176        if (!group->current_msg) {
1177            BDBG_MSG(("first pass for group buffer, len %d, pos 0x%x",
1178                *length, *buffer));
1179            group->current_msg = *buffer;
1180            group->length = *length;
1181            BDBG_ASSERT(group->group_read_cnt == 0);
1182        }
1183
1184        /* first pass for this message, so figure out all
1185        the session's that must read the message before we can advance */
1186        if (!group->group_read_cnt) {
1187            BERR_Code rc;
1188
1189            rc = BMSGlib_process_next_message(session);
1190            if (rc) return rc;
1191
1192            /* Because of an internal flush (for error recovery), it's possible that there is no data after a successful process_next_message */
1193            if (!group->current_msg_length) {
1194                *buffer = NULL;
1195                *length = 0;
1196                goto done;
1197            }
1198
1199            BDBG_MSG(("new group message group_read_cnt=%d, msglen=%d", group->group_read_cnt,
1200                group->current_msg_length));
1201            BDBG_ASSERT(group->refcnt >= group->group_read_cnt);
1202
1203            if (!group->group_read_cnt) {
1204                /* no one wants this message. this happens when we close a session
1205                which originally filtered the data but stopped before reading it all out. */
1206                BDBG_MSG(("unused message %d at %p", group->current_msg_length, group->current_msg));
1207
1208                rc = BMSGlib_ReadComplete(session, group->current_msg_length);
1209                if ( rc ) {
1210                  BDBG_ERR(("GetBuffer/ReadComplete got an error %d",rc));
1211                  return rc;
1212                }
1213
1214                /* The following code was found to be needed in one case. It is highly unlikely that an infinite
1215                loop will happen here, especially with the BMSGlib_Flush error recovery. But we're left it in. */
1216                retryCount++;
1217                if ( retryCount > 50 ) {
1218                    BDBG_ERR(("Aborting apparent infinite loop after %d retries", retryCount));
1219                    BMSGlib_Flush(session);
1220                    *buffer = NULL;
1221                    *length = 0;
1222                    return 0;
1223                }
1224
1225                /* at this point, we should try the next message. we will
1226                exit when we find something, or run out of data. */
1227                goto try_next_message;
1228            }
1229        }
1230
1231        /* check if this session is a part of this current group read */
1232        if (session->group_read_length) {
1233            /* remember that this can be called multiple times, so don't
1234            unmark here */
1235            *buffer = group->current_msg;
1236            *length = session->group_read_length;
1237        }
1238        else {
1239            /* there may be data for this session later in the buffer,
1240            but not this message. the key is that every session will be interrupted
1241            again when this message is done */
1242            *buffer = NULL;
1243            *length = 0;
1244        }
1245    }
1246
1247done:
1248    BDBG_MSG(("GetBuffer %p (group %p) buffer=%p length=%d", session, session->group, *buffer, *length));
1249    return BERR_SUCCESS;
1250}
1251
1252BERR_Code BMSGlib_ReadComplete(BMSGlib_Session_Handle session, unsigned amount_consumed)
1253{
1254    struct BMSGlib_Group *group = session->group;
1255    BERR_Code rc;
1256
1257    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
1258    if (session->started == false) {
1259        return BERR_UNKNOWN;
1260    }
1261    BDBG_ASSERT(group);
1262
1263    BDBG_MSG(("ReadComplete %p (group %p): %d out of %d", session, session->group, amount_consumed, session->group_read_length));
1264
1265    if (session->params.format!=BMSGlib_Format_ePSI) {
1266        /* shortcut for PES/TS record */
1267        rc = BXPT_UpdateReadOffset( session->msglib->settings.xpt, session->PidChannelNum, amount_consumed);
1268        if (rc) return BERR_TRACE(rc);
1269        return BERR_SUCCESS;
1270    }
1271
1272    /* In more recent silicon, this 4 byte pad is programmable. But for now the XPT PI defaults it on. */
1273    if (amount_consumed % 4) {
1274        BDBG_WRN(("XPT HW pads messages to 4 byte alignment. Increasing BMSGlib_ReadComplete(%d) to consume that pad.", amount_consumed));
1275        amount_consumed += 4 - (amount_consumed % 4);
1276    }
1277
1278    if (group->group_read_cnt) {
1279        /* make sure we're not completing more than we expected */
1280        if (amount_consumed > (size_t)session->group_read_length) {
1281            BDBG_ERR(("app trying to consume %d, but only %d is available. reducing amount.", amount_consumed, session->group_read_length));
1282            amount_consumed = (size_t)session->group_read_length;
1283        }
1284        if (!session->group_read_length) {
1285            /* this session was not part of the group read, so it's not part of the group_read_cnt. */
1286            return BERR_SUCCESS;
1287        }
1288        session->group_read_length -= amount_consumed;
1289        if (session->group_read_length) {
1290            /* this session is not done */
1291            return BERR_SUCCESS;
1292        }
1293        if (--group->group_read_cnt > 0) {
1294            /* this session is done, but there are more reads of the current
1295            message pending before we're done */
1296            return BERR_SUCCESS;
1297        }
1298    }
1299
1300    /* we're done with the message. */
1301
1302    if (group->use_contiguous_buffer) {
1303        /* in this case, we've already updated XPT, so just clear the state and reread on next get_buffer. */
1304        group->use_contiguous_buffer = false;
1305        group->current_msg_length = 0;
1306        group->length = 0;
1307    }
1308    else {
1309        rc = BXPT_UpdateReadOffset( session->msglib->settings.xpt, session->PidChannelNum, amount_consumed);
1310        if (rc) {
1311            /* flush for error recovery, but still pass on the error code */
1312            BMSGlib_Flush(session);
1313            return BERR_TRACE(rc);
1314        }
1315    }
1316    BMSGlib_p_advance_group_buffer(group);
1317
1318    return BERR_SUCCESS;
1319}
1320
1321BERR_Code BMSGlib_GetStatus( BMSGlib_Session_Handle session, BMSGlib_Status *pStatus )
1322{
1323    BDBG_OBJECT_ASSERT(session, BMSGlib_Session);
1324    if (session->group) {
1325        pStatus->groupMembers = session->group->refcnt;
1326        pStatus->isGroupMaster = (session == session->group->master_session);
1327    }
1328    else {
1329        BKNI_Memset(pStatus, 0, sizeof(*pStatus));
1330    }
1331    return 0;
1332}
Note: See TracBrowser for help on using the repository browser.