Merge pull request #5913 from alorbach/doc-add-dev_action_threads

docs: add developer page "Action Threads and Queue Engine"
This commit is contained in:
Rainer Gerhards 2025-08-13 11:11:19 +02:00 committed by GitHub
commit 04dad1a9dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 190 additions and 5 deletions

View File

@ -0,0 +1,174 @@
.. _dev-action-threads:
.. index::
single: queues; worker threads
single: queues; batching
single: queues; backpressure
single: queues; suspend/resume
Action Threads and Queue Engine (Developer-Oriented)
====================================================
.. page-summary-start
Developer deep-dive into rsyslogs action queues, worker threads, batching, backpressure,
and suspend/resume behavior.
.. page-summary-end
Overview
--------
The rsyslog queue engine connects producers (inputs/rulesets) with consumers (actions),
providing durability, concurrency, backpressure, and error handling. Each action may
use a dedicated queue (or direct mode) and one or more worker threads. The engine
balances throughput and ordering while coping with failures and shutdown.
User-facing queue concepts and parameters are covered in :doc:`../concepts/queues`
and :doc:`../rainerscript/queue_parameters`.
Terminology
-----------
- **Producer**: Component that enqueues a message (main queue, ruleset, or action caller).
- **Action**: Output/processing step (e.g., omfwd, omfile).
- **Action queue**: The queue attached to an action (or direct mode without an explicit queue).
- **Worker thread**: Thread that dequeues batches, executes the action, and finalizes results.
- **Batch**: Group of messages processed as a unit (see ``queue.dequeueBatchSize``).
- **Backpressure**: Throttling behavior when queue reaches high-water marks.
- **Retry/Suspend**: Temporary action pause with backoff when the action fails.
- **DA mode**: Disk-assisted in-memory queue (memory primary, spills to disk on pressure).
- **Disk-only queue**: Persistent queue on disk for maximum durability.
Thread and Queue Topology
-------------------------
- Main message queue feeds rulesets. Rulesets evaluate messages and dispatch to actions.
- For each action:
- **direct** mode: No queue; messages are synchronously handed to the action.
- **memory/disk/DA** queue: Messages enter an action queue processed by N workers.
- Worker threads scale per action (``queue.workerThreads``) and dequeue in batches.
.. mermaid::
flowchart LR
Inputs[Input modules] --> MainQ[Main Message Queue];
MainQ --> Rules[Ruleset Processor];
Rules --> A1Q[Action Queue A1];
Rules --> A2Q[Action Queue A2];
A1Q --> A1W[Workers A1];
A2Q --> A2W[Workers A2];
A1W --> A1Out["Action A1: omfile"];
A2W --> A2Out["Action A2: omfwd"];
Message Processing Lifecycle (Action Queue)
-------------------------------------------
1. Enqueue
- Producer pushes message to the action queue.
- DA/disk queues persist as configured; memory queues hold in RAM.
2. Worker activation
- Idle worker threads wake when items are available.
- Thread count respects ``queue.workerThreads`` (and internal min/max).
3. Dequeue and batch
- Worker dequeues up to ``queue.dequeueBatchSize`` messages (not less than ``queue.minDequeueBatchSize`` unless draining or shutting down).
- Dequeue is FIFO; end-to-end emission may reorder with multiple workers.
4. Execute action
- Worker invokes the action module once per batch or per message depending on the action.
- Action may signal success, transient failure (retry), or fatal failure (discard or suspend).
5. Commit/finalize
- On success, messages are acknowledged and removed (and persisted offsets advanced).
- On transient failure, worker applies backoff/suspend/retry logic; messages remain pending.
- On fatal failure, behavior follows queue/action discard policy.
6. Idle/scale-down
- If the queue is drained, workers may sleep and be reclaimed after timeouts.
7. Shutdown
- On shutdown, workers attempt to drain within ``queue.timeoutShutdown`` and honor persistence settings (e.g., ``queue.saveOnShutdown`` for DA/disk).
Backpressure and Flow Control
-----------------------------
- **Watermarks**:
- High-water mark: When reached, producers slow down or block per timeout settings.
- Low-water mark: Normal operation resumes below this threshold.
- **Discard policy**:
- When near/at capacity, use ``queue.discardMark`` and ``queue.discardSeverity`` to shed lower-priority messages first (if configured).
- **Timeouts**:
- ``queue.timeoutEnqueue``: Producer blocking time when queue is full.
- ``queue.timeoutActionCompletion``: How long a worker waits for the action to complete.
- **Rate limiting**:
- Configurable in some actions to avoid overloading downstreams.
.. mermaid::
stateDiagram-v2
[*] --> Empty
Empty --> Filling: enqueue
Filling --> High: size >= highWatermark
High --> Draining: dequeue/batches
Draining --> Filling: enqueue > dequeue
Draining --> Empty: size == 0
High --> Filling: size < highWatermark
Error Handling, Retry, Suspend
------------------------------
- Transient errors trigger backoff:
- Worker suspends the action for a short interval and retries later (interval may grow).
- Persistent errors:
- Depending on module and settings, move to dead-letter semantics, drop, or keep retrying.
- Disk-backed safety:
- DA and disk queues keep messages across process restarts (subject to sync and checkpoint settings).
Queue Types and Selection
-------------------------
- **direct**: Lowest latency, no buffering; action must keep up or it becomes a bottleneck.
- **in-memory**: High throughput, volatile; messages lost on crash unless DA or disk-backed.
- **disk-assisted in-memory (DA)**: Fast under normal load, durable under bursts.
- **disk-only**: Highest durability, higher latency; best for critical delivery.
Key Parameters (see :doc:`../rainerscript/queue_parameters`)
------------------------------------------------------------
- ``queue.type``: direct, LinkedList (memory), FixedArray (memory), disk, (DA via memory-queue with ``queue.filename`` set).
- ``queue.size``: Capacity in number of messages (memory queues).
- ``queue.dequeueBatchSize`` / ``queue.minDequeueBatchSize``: Batch sizing.
- ``queue.workerThreads``: Max concurrent workers per action.
- ``queue.highWatermark`` / ``queue.lowWatermark``: Backpressure thresholds.
- ``discardMark`` / ``discardSeverity``: Controlled shedding under pressure.
- ``queue.spoolDirectory`` / ``queue.filename``: Disk storage for DA/disk queues.
- ``queue.checkpointInterval`` / ``queue.syncQueueFiles``: Durability and fsync policy.
- ``queue.timeoutEnqueue`` / ``queue.timeoutShutdown`` / ``queue.timeoutActionCompletion``: Timing behavior.
- ``queue.saveOnShutdown``: Persist pending entries at shutdown (DA/disk).
Sequence and Error Paths
------------------------
.. mermaid::
sequenceDiagram
actor P as Producer
participant Q as Action Queue
participant W as Worker Thread
participant A as Action Module
P->>Q: enqueue(msg)
Q-->>W: wake
W->>Q: dequeue(batch)
W->>A: process(batch)
A-->>W: success | transient_error | fatal_error
alt success
W->>Q: commit/remove(batch)
else transient_error
W->>W: backoff/suspend (retry later)
else fatal_error
W->>Q: discard or DLQ policy
end
W-->>Q: next batch or sleep
Developer Notes
---------------
- Batching improves throughput but increases per-message latency; tune batch sizes per action characteristics.
- Parallel workers can reorder across batches; per-queue FIFO is preserved, but global ordering is not.
- Avoid blocking in action code; prefer non-blocking I/O and internal buffering where possible.
- Ensure action modules clearly communicate transient vs. permanent errors to the engine.
Cross-References
----------------
- :doc:`../concepts/queues`
- :doc:`../rainerscript/queue_parameters`
- :doc:`../whitepapers/queues_analogy`

View File

@ -6,15 +6,22 @@ still incomplete. Target audience is developers and users who would like
to get an in-depth understanding of queues as used in
`rsyslog <http://www.rsyslog.com/>`_.
**Please note that this document is outdated and does not longer reflect
the specifics of the queue object. However, I have decided to leave it
in the doc set, as the overall picture provided still is quite OK. I
intend to update this document somewhat later when I have reached the
"store-and-forward" milestone.**
**Note: This document describes historic implementation details and is
partially outdated. For the current developer overview of action threads
and queues, see :doc:`dev_action_threads`.**
*Please note that this document outdated no longer reflects the specifics
of the queue object. However, we have decided to leave it
in the doc set, as the overall picture provided still is quite OK. We
intend to continue update this document somewhat later when we have
clarified some the overall developer doc structure.*
Some definitions
----------------
The term "DA" means "disk-assisted mode" of a queue. Details in the
implementation details.
A queue is DA-enabled if it is configured to use disk-assisted mode when
there is need to. A queue is in DA mode (or DA run mode), when it
actually runs disk assisted.

View File

@ -605,3 +605,7 @@ The following is a sample of a TCP forwarding action with its own queue.
queue.filename="forwarding" queue.size="1000000" queue.type="LinkedList"
)
See also
========
- :ref:`dev-action-threads`