begun working on time-window based dequeueing (and rate limiting in

general)
This commit is contained in:
Rainer Gerhards 2008-04-02 16:53:29 +00:00
parent 38f0cd6762
commit 9b48c4a481
6 changed files with 80 additions and 2 deletions

View File

@ -122,7 +122,7 @@ based framing on syslog/tcp connections</td>
<td valign="top">yes</td>
</tr>
<tr>
<td valign="top">syslog over RELP<br>this is a truely reliable solution (plain tcp syslog can lose messages!)</td>
<td valign="top">syslog over RELP<br>truly reliable message delivery (<a href="http://rgerhards.blogspot.com/2008/04/on-unreliability-of-plain-tcp-syslog.html">Why is plain tcp syslog not reliable?</a>)</td>
<td valign="top">yes</td>
<td valign="top">no</td>
</tr>

57
queue.c
View File

@ -55,6 +55,7 @@ DEFobjStaticHelpers
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal queueRateLimiter(queue_t *pThis);
static int queueChkStopWrkrDA(queue_t *pThis);
static int queueIsIdleDA(queue_t *pThis);
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
@ -341,6 +342,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
@ -1450,6 +1452,60 @@ finalize_it:
}
/* The rate limiter - we only need one - do we?
*
* Here we may wait if a dequeue time window is defined or if we are
* rate-limited. TODO: If we do so, we should also look into the
* way new worker threads are spawned. Obviously, it doesn't make much
* sense to spawn additional worker threads when none of them can do any
* processing. However, it is deemed acceptable to allow this for an initial
* implementation of the timeframe/rate limiting feature.
* Please also note that these feature could also be implemented at the action
* level. However, that would limit them to be used together with actions. We have
* taken the broader approach, moving it right into the queue. This is even
* necessary if we want to prevent spawning of multiple unnecessary worker
* threads as described above. -- rgerhards, 2008-04-02
*
*
* time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format).
* We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy
* we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two
* windows: 0-4; 22-23:59
* so when to run? Let's assume we have 3am
*
* if(tTo < tFrom) {
* if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22])
* do work
* else
* sleep for tFrom - tCurr "hours" [22 - 5 --> 17]
* } else {
* if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10])
* do work
* else
* sleep for tTo - tCurr "hours" [4 - 3 --> 1]
* }
*
* Bottom line: we need to check which type of window we have and need to adjust our
* logic accordingly. Of course, sleep calculations need to be done up to the minute,
* but you get the idea from the code above.
*/
static rsRetVal
queueRateLimiter(queue_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
dbgoprint((obj_t*) pThis, "entering rate limiter\n");
srSleep(2, 0);
finalize_it:
dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet);
RETiRet;
}
/* This is the queue consumer in the regular (non-DA) case. It is
* protected by the queue mutex, but MUST release it as soon as possible.
* rgerhards, 2008-01-21
@ -1690,6 +1746,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));

13
queue.h
View File

@ -82,9 +82,20 @@ typedef struct queue_s {
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
int toEnq; /* enqueue timeout */
/* rate limiting settings (will be expanded */
/* rate limiting settings (will be expanded) */
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
/* end rate limiting */
/* dequeue time window settings (may also be expanded) */
int iDeqtWinFromHr; /* begin of dequeue time window (hour only) */
int iDeqtWinToHr; /* end of dequeue time window (hour only) */
/* note that begin and end have specific semantics. It is a big difference if we have
* begin 4, end 22 or begin 22, end 4. In the later case, dequeuing will run from 10p,
* throughout the night and stop at 4 in the morning. In the first case, it will start
* at 4am, run throughout the day, and stop at 10 in the evening! So far, not logic is
* applied to detect user configuration errors (and tell me how should we detect what
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* ane dequeue time window */
rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer

8
wti.c
View File

@ -370,6 +370,14 @@ wtiWorker(wti_t *pThis)
pthread_yield(); /* see big comment in function header */
# endif
/* if we have a rate-limiter set for this worker pool, let's call it. Please
* keep in mind that the rate-limiter may hold us for an extended period
* of time. -- rgerhards, 2008-04-02
*/
if(pWtp->pfRateLimiter != NULL) {
pWtp->pfRateLimiter(pWtp->pUsr);
}
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);

1
wtp.c
View File

@ -545,6 +545,7 @@ DEFpropSetMeth(wtp, pUsr, void*);
DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t);
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t);
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));

1
wtp.h
View File

@ -68,6 +68,7 @@ typedef struct wtp_s {
pthread_mutex_t *pmutUsr;
pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
rsRetVal (*pfRateLimiter)(void *pUsr);
rsRetVal (*pfIsIdle)(void *pUsr, int);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
rsRetVal (*pfOnIdle)(void *pUsr, int);