sidecar: add rsyslog impstats exporter

Make a production-ready sidecar exporter to simplify operations and
support consistent deployments across hosts and containers.
Before: no sidecar tooling shipped. After: exporter, scripts, and docs.
Impact: new sidecar defaults to udp on loopback; tests add venv runners.

Implement JSON/CEE/Prometheus parsing with counter/gauge heuristics.
Add UDP burst buffering, size limits, and optional source filtering.
Expose /metrics and /health with parse and drop status reporting.
Provide Dockerfile, docker-compose examples, and systemd install flow.
Add validation and UDP test runners plus sample impstats data.
Document production setup, security posture, and file growth caveats.

With the help of AI-Agents: GitHub Copilot

See also: https://github.com/rsyslog/rsyslog/issues/5824
This commit is contained in:
Andre Lorbach 2026-01-21 08:46:02 +00:00
parent 0c51bc3dde
commit eb7b57ed1d
19 changed files with 2403 additions and 0 deletions

41
sidecar/.gitignore vendored Normal file
View File

@ -0,0 +1,41 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
*.egg-info/
dist/
build/
*.egg
# Virtual environments
venv/
.venv/
env/
ENV/
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Logs
*.log
# Test artifacts
.pytest_cache/
.coverage
htmlcov/
# Local test files
test-impstats.*
# Scratch files
tmp

68
sidecar/Dockerfile Normal file
View File

@ -0,0 +1,68 @@
# rsyslog Prometheus Exporter - Lightweight Sidecar Container
FROM python:3.12-slim
LABEL maintainer="rsyslog project"
LABEL description="Prometheus exporter sidecar for rsyslog impstats"
# Create non-root user
RUN groupadd -r rsyslog && useradd -r -g rsyslog rsyslog
# Set working directory
WORKDIR /app
# Copy requirements first for better layer caching
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy exporter script
COPY rsyslog_exporter.py .
# Make script executable
RUN chmod +x rsyslog_exporter.py
# Create directory for impstats file (will typically be mounted)
RUN mkdir -p /var/log/rsyslog && chown rsyslog:rsyslog /var/log/rsyslog
# Switch to non-root user
USER rsyslog
# Default environment variables (can be overridden)
ENV IMPSTATS_PATH=/var/log/rsyslog/impstats.json
ENV IMPSTATS_FORMAT=json
ENV LISTEN_ADDR=127.0.0.1
ENV LISTEN_PORT=9898
ENV LOG_LEVEL=INFO
# SECURITY NOTE: Default is loopback. Override with LISTEN_ADDR=0.0.0.0 in
# container deployments when you intend to expose /metrics.
# Expose metrics port
EXPOSE 9898
# Health check (respects LISTEN_ADDR/LISTEN_PORT)
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python3 - <<'PY' || exit 1
import http.client
import os
import sys
port = os.environ.get('LISTEN_PORT', '9898')
addr = os.environ.get('LISTEN_ADDR', '127.0.0.1')
if addr == '0.0.0.0':
addr = '127.0.0.1'
try:
conn = http.client.HTTPConnection(addr, int(port), timeout=4)
conn.request("GET", "/health")
res = conn.getresponse()
if not (200 <= res.status < 300):
sys.exit(1)
except Exception:
sys.exit(1)
PY
# Run the exporter with gunicorn (production WSGI server)
# Default to a single worker to support UDP mode safely. Increase for file mode if needed.
CMD ["/bin/sh", "-c", "gunicorn --bind ${LISTEN_ADDR}:${LISTEN_PORT} --workers 1 --threads 2 --access-logfile - rsyslog_exporter:application"]

611
sidecar/README.md Normal file
View File

@ -0,0 +1,611 @@
# rsyslog Prometheus Exporter
A lightweight Python sidecar that reads rsyslog `impstats` (file or UDP) and exposes Prometheus metrics at `/metrics`, plus a `/health` endpoint for operational checks.
## Features
- **Multiple format support**: JSON, native Prometheus, and CEE/Lumberjack formats
- **Lightweight**: Minimal dependencies, small memory footprint
- **Flexible deployment**: Runs as standalone process, systemd service, or container sidecar
- **Auto-refresh**: Monitors impstats file and reloads when updated
- **Health checks**: Built-in `/health` endpoint for monitoring
- **Production-ready**: Uses gunicorn WSGI server for production deployments
- **Secure defaults**: Binds to localhost, supports source IP filtering
> Note on UDP mode: For stability, UDP mode requires a single Gunicorn worker (`--workers 1`). The provided Dockerfile and systemd unit already default to one worker. Increase workers only when using file mode.
> **⚠️ Production Deployments**: For production environments, **always use the gunicorn WSGI server** instead of the development server.
## Quick Start
If you just want a working setup on a host with rsyslog:
1. Run the installer (Option 2 below).
2. Let it deploy the sample rsyslog config when prompted.
3. Restart rsyslog and scrape `/metrics`.
### Prerequisites
- Python 3.8 or higher
- rsyslog configured with `impstats` module
### Installation Options
#### Option 1: Standalone Python Script (Development/Testing)
> **Note:** This uses the Werkzeug development server. For production, see Option 2 (systemd with gunicorn) below.
1. **Create a venv and install dependencies:**
```bash
cd sidecar
python3 -m venv .venv
./.venv/bin/pip install -r requirements.txt
```
2. **Configure rsyslog** to emit impstats (see [Configuration](#rsyslog-configuration) below)
3. **Run the exporter:**
```bash
export IMPSTATS_PATH=/var/log/rsyslog/impstats.json
export IMPSTATS_FORMAT=json
./.venv/bin/python rsyslog_exporter.py
```
You'll see a warning:
```
************************************************************
DEVELOPMENT MODE - Not suitable for production!
For production, use: gunicorn --workers 1 -b 127.0.0.1:9898 rsyslog_exporter:application
************************************************************
```
4. **Test the endpoint:**
```bash
curl http://localhost:9898/metrics
```
#### Option 2: systemd Service with Gunicorn (Recommended for Production)
1. **Install via the helper script:**
```bash
sudo ./install-service.sh --impstats-mode udp
```
The installer will prompt to deploy a sample rsyslog impstats config to
/etc/rsyslog.d/10-impstats.conf. If the file exists, it will ask whether
to overwrite it. The deployed file uses `--impstats-udp-addr` and
`--impstats-udp-port` for the target. If you did not set
`--impstats-udp-addr`, the installer will use `--listen-addr` instead
(except when it is 0.0.0.0).
2. **Check status:**
```bash
sudo systemctl status rsyslog-exporter
journalctl -u rsyslog-exporter -f
```
#### Option 3: Docker Container (Sidecar Pattern)
1. **Build the container:**
```bash
cd sidecar
docker build -t rsyslog-exporter:latest .
```
2. **Run with docker-compose:**
```bash
docker-compose up -d
```
This starts:
- rsyslog container
- rsyslog-exporter sidecar
- (Optional) Prometheus for testing
3. **Or run standalone container:**
```bash
docker run -d \
--name rsyslog-exporter \
-v /var/log/rsyslog:/var/log/rsyslog:ro \
-e IMPSTATS_PATH=/var/log/rsyslog/impstats.json \
-e IMPSTATS_FORMAT=json \
-p 9898:9898 \
rsyslog-exporter:latest
```
#### Option 4: Kubernetes Sidecar
```yaml
apiVersion: v1
kind: Pod
metadata:
name: rsyslog-with-exporter
spec:
containers:
# Main rsyslog container
- name: rsyslog
image: rsyslog/rsyslog:latest
volumeMounts:
- name: impstats
mountPath: /var/log/rsyslog
# Prometheus exporter sidecar
- name: exporter
image: rsyslog-exporter:latest
env:
- name: IMPSTATS_PATH
value: /var/log/rsyslog/impstats.json
- name: IMPSTATS_FORMAT
value: json
ports:
- containerPort: 9898
name: metrics
volumeMounts:
- name: impstats
mountPath: /var/log/rsyslog
readOnly: true
volumes:
- name: impstats
emptyDir: {}
```
### Validation
Run parser validation (no HTTP server needed):
```bash
./tests/run-validate.sh
```
Run UDP burst test (expects exporter on http://localhost:19898):
```bash
./tests/run-test-udp.sh
```
## File Growth & Mode Choice
rsyslog `impstats` **appends** to the file each interval, so file mode can grow unbounded. For production, **UDP mode is recommended** to avoid file growth and ensure each scrape reflects a single stats burst.
**UDP mode (recommended):**
```
ruleset(name="pstats") {
action(type="omfwd" target="127.0.0.1" port="19090" protocol="udp")
}
module(load="impstats" interval="30" format="json" Ruleset="pstats" bracketing="on")
```
**File mode (only if you manage rotation):**
```
module(load="impstats"
interval="30"
format="json"
log.file="/var/log/rsyslog/impstats.json"
log.syslog="off"
resetCounters="off")
```
If you must use file mode, configure log rotation (e.g., `copytruncate`) and monitor disk usage.
## Production Checklist
- Use the systemd installer (Option 2) or run gunicorn directly.
- Keep `LISTEN_ADDR=127.0.0.1` unless you need remote scraping.
- Prefer UDP mode for stable, bounded memory and no file growth.
- If using file mode, set up log rotation and monitor disk usage.
- Restrict UDP sources with `ALLOWED_UDP_SOURCES` if packets are not loopback.
## Configuration
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `IMPSTATS_MODE` | `udp` | Input mode: `file` or `udp` |
| `IMPSTATS_PATH` | `/var/log/rsyslog/impstats.json` | Path to rsyslog impstats file (file mode) |
| `IMPSTATS_FORMAT` | `json` | Format: `json`, `prometheus`, or `cee` |
| `IMPSTATS_UDP_ADDR` | `127.0.0.1` | UDP bind address (UDP mode) - loopback only by default |
| `IMPSTATS_UDP_PORT` | `19090` | UDP port (UDP mode) |
| `STATS_COMPLETE_TIMEOUT` | `5` | Seconds to wait for burst completion (UDP mode) |
| `LISTEN_ADDR` | `127.0.0.1` | HTTP server bind address - **loopback only by default for security** |
| `LISTEN_PORT` | `9898` | HTTP server port |
| `LOG_LEVEL` | `INFO` | Logging level: `DEBUG`, `INFO`, `WARNING`, `ERROR` |
| `ALLOWED_UDP_SOURCES` | (unset) | Optional comma-separated IP allowlist for UDP packets (UDP mode). If unset or empty, all sources are allowed. If set to a non-empty string, packets from other sources are dropped and counted in `/health` as `dropped_messages`. |
| `MAX_BURST_BUFFER_LINES` | `10000` | Max lines buffered per stats burst (UDP mode). Excess lines are dropped to prevent DoS; track via `/health` `dropped_messages`. |
| `MAX_UDP_MESSAGE_SIZE` | `65535` | Max UDP message size accepted (bytes). Larger packets are dropped. |
## Security Highlights
- **Secure defaults**: HTTP and UDP bind to `127.0.0.1` by default.
- **DoS protection**: Burst buffer and UDP message size limits; dropped messages tracked in `/health`.
- **Source filtering**: Optional allowlist via `ALLOWED_UDP_SOURCES`.
**Expose metrics safely:**
- Same-host: keep `LISTEN_ADDR=127.0.0.1`.
- Remote Prometheus: bind to a specific internal/VPN IP or use firewall rules.
- Containers: `0.0.0.0` is acceptable with network isolation.
**Operational info exposure:** `/health` includes file paths and listener info.
Keep it on loopback or protect it with a reverse proxy/firewall.
**Security monitoring tips:**
```bash
# Check dropped UDP messages
curl http://localhost:9898/health | jq '.dropped_messages'
# Allowlist UDP sources (if binding to 0.0.0.0)
export ALLOWED_UDP_SOURCES=127.0.0.1,10.0.1.50
```
#### Security Considerations
**Default bindings are secure for same-host deployment:**
- UDP listener: `127.0.0.1` (accepts impstats from local rsyslog only)
- HTTP server: `127.0.0.1` (metrics accessible to local Prometheus only)
**Common deployment scenarios:**
1. **Same-host Prometheus (most secure):**
```bash
IMPSTATS_UDP_ADDR=127.0.0.1 # rsyslog on same host
LISTEN_ADDR=127.0.0.1 # Prometheus on same host
```
2. **Remote Prometheus on VPN/internal network:**
```bash
IMPSTATS_UDP_ADDR=127.0.0.1 # rsyslog on same host
LISTEN_ADDR=10.0.1.50 # Bind to VPN interface
# Or use firewall rules with 0.0.0.0
```
3. **Container/Kubernetes sidecar:**
```bash
IMPSTATS_UDP_ADDR=0.0.0.0 # Accept from rsyslog container
LISTEN_ADDR=0.0.0.0 # Expose to Prometheus
# Network isolation provided by container network
```
4. **Bare metal with firewall:**
```bash
IMPSTATS_UDP_ADDR=127.0.0.1
LISTEN_ADDR=0.0.0.0 # Bind to all, restrict with firewall
# Firewall rule: allow port 9898 only from Prometheus servers
```
### rsyslog Configuration
Add the following to your rsyslog configuration (e.g., `/etc/rsyslog.d/impstats.conf`):
#### JSON Format (Recommended)
```
module(load="impstats"
interval="30"
format="json"
log.file="/var/log/rsyslog/impstats.json"
log.syslog="off"
resetCounters="off")
```
#### Prometheus Native Format (rsyslog 8.2312.0+)
```
module(load="impstats"
interval="30"
format="prometheus"
log.file="/var/log/rsyslog/impstats.prom"
log.syslog="off")
```
Then configure the exporter:
```bash
export IMPSTATS_PATH=/var/log/rsyslog/impstats.prom
export IMPSTATS_FORMAT=prometheus
```
#### CEE Format (Legacy)
```
module(load="impstats"
interval="30"
format="cee"
log.file="/var/log/rsyslog/impstats.json"
log.syslog="off")
```
Then configure the exporter:
```bash
export IMPSTATS_FORMAT=cee
```
**Important**: Ensure the impstats file location is readable by the exporter process.
### Prometheus Scrape Configuration
Add to your `prometheus.yml`:
```yaml
scrape_configs:
- job_name: 'rsyslog'
scrape_interval: 30s
static_configs:
- targets: ['localhost:9898']
labels:
instance: 'rsyslog-main'
environment: 'production'
```
## Best Practices
### Update Intervals
- **rsyslog impstats interval**: 30 seconds (balance between freshness and overhead)
- **Prometheus scrape interval**: 30-60 seconds (match or slightly exceed impstats interval)
- **Exporter refresh**: On-demand per scrape (reloads file if changed)
### Metric Naming
The exporter converts rsyslog impstats metrics to Prometheus format with the following conventions:
- Prefix: `rsyslog_`
- Component: Derived from `origin` field (e.g., `core`, `imtcp`, `core.action`)
- Metric: Original field name, sanitized for Prometheus
Example mappings:
| rsyslog field | Prometheus metric |
|---------------|-------------------|
| `{"name":"global","origin":"core","utime":"123456"}` | `rsyslog_core_utime{rsyslog_component="core",rsyslog_resource="global"} 123456` |
| `{"name":"action 0","origin":"core.action","processed":"1000"}` | `rsyslog_core_action_processed{rsyslog_component="core.action",rsyslog_resource="action_0"} 1000` |
### Metric Types
The exporter automatically determines metric types based on field names:
- **Counters**: `processed`, `failed`, `submitted`, `utime`, `stime`, `called.*`, `bytes.*`
- **Gauges**: All other numeric fields (e.g., `size`, `maxrss`)
### Resource Limits
For production deployments:
- **Memory**: ~20-50 MB typical, ~100 MB with hundreds of metrics
- **CPU**: Negligible (file parsing on scrape only)
- **Disk I/O**: Minimal (reads impstats file on demand)
Recommended systemd limits:
```ini
MemoryMax=256M
TasksMax=10
```
### Production Notes
- Use gunicorn for production; UDP mode requires a single worker.
- For file mode, you can raise workers for HTTP concurrency.
- Keep `LISTEN_ADDR=127.0.0.1` unless protected by firewall or proxy.
Example production command:
```bash
./.venv/bin/python -m gunicorn \
--bind 127.0.0.1:9898 \
--workers 1 \
--threads 2 \
--timeout 30 \
--access-logfile /var/log/rsyslog-exporter/access.log \
--error-logfile /var/log/rsyslog-exporter/error.log \
rsyslog_exporter:application
```
### Security
**Network Binding Security:**
By default, the exporter binds to loopback interfaces for security:
- **UDP listener**: `127.0.0.1` - accepts impstats only from local rsyslog
- **HTTP metrics**: `127.0.0.1` - serves metrics only to local Prometheus
This is **secure for single-host deployments**. For other scenarios:
**Remote Prometheus access:**
1. **Bind to specific interface** (recommended):
```bash
LISTEN_ADDR=10.0.1.50 # Your VPN or internal network IP
```
2. **Use SSH tunneling**:
```bash
# On Prometheus server
ssh -L 9898:localhost:9898 rsyslog-host
```
3. **Bind to all + firewall** (use with caution):
```bash
LISTEN_ADDR=0.0.0.0
# Configure firewall to allow only Prometheus servers
```
**Additional security measures:**
1. **Run as non-root user** (default in Docker, configure in systemd)
2. **Read-only access** to impstats file (exporter only needs read)
3. **Firewall rules**: Restrict port 9898 to Prometheus servers only
4. **TLS/mTLS**: Use reverse proxy (nginx, Envoy) if metrics traverse untrusted networks
5. **Network segmentation**: Keep metrics endpoints on management/monitoring network
## Endpoints
### `/metrics`
Returns Prometheus text exposition format with all current metrics.
**Example:**
```
# HELP rsyslog_core_utime rsyslog metric rsyslog_core_utime
# TYPE rsyslog_core_utime counter
rsyslog_core_utime{rsyslog_component="core",rsyslog_resource="global"} 123456.0
# HELP rsyslog_core_action_processed rsyslog metric rsyslog_core_action_processed
# TYPE rsyslog_core_action_processed counter
rsyslog_core_action_processed{rsyslog_component="core.action",rsyslog_resource="action_0"} 4950.0
```
### `/health`
Returns JSON health status.
**Example:**
```json
{
"status": "healthy",
"uptime_seconds": 3600.5,
"impstats_file": "/var/log/rsyslog/impstats.json",
"impstats_format": "json",
"metrics_count": 42,
"parse_count": 120
}
```
## Testing
### Test with Sample Data
1. **Use provided sample files:**
```bash
# JSON format
export IMPSTATS_PATH=examples/sample-impstats.json
export IMPSTATS_FORMAT=json
python3 rsyslog_exporter.py
```
2. **Generate test data:**
```bash
# Simulate rsyslog writing to impstats
cat examples/sample-impstats.json > /tmp/test-impstats.json
export IMPSTATS_PATH=/tmp/test-impstats.json
python3 rsyslog_exporter.py
```
3. **Verify metrics:**
```bash
curl http://localhost:9898/metrics
curl http://localhost:9898/health
```
### Integration Testing
```bash
# Start rsyslog and exporter with docker-compose
docker-compose up -d
# Send test syslog messages
logger -n localhost -P 514 "Test message"
# Wait for impstats update (30s)
sleep 35
# Check metrics
curl http://localhost:9898/metrics | grep rsyslog
# View in Prometheus
open http://localhost:9090
```
## Troubleshooting
### Exporter shows 0 metrics
**Cause**: Impstats file not found or empty
**Solution**:
- Check rsyslog configuration and restart rsyslog
- Verify file path and permissions
- Check rsyslog logs: `journalctl -u rsyslog -f`
### Metrics are stale
**Cause**: rsyslog not updating impstats file
**Solution**:
- Verify rsyslog `impstats` interval setting
- Check if rsyslog is processing messages
- Ensure impstats file location is writable by rsyslog
### Permission denied errors
**Cause**: Exporter cannot read impstats file
**Solution**:
```bash
# For systemd
sudo chown rsyslog:rsyslog /var/log/rsyslog/impstats.json
sudo chmod 644 /var/log/rsyslog/impstats.json
# For container
docker run -v /var/log/rsyslog:/var/log/rsyslog:ro ...
```
### High memory usage
**Cause**: Too many unique metric combinations (high cardinality)
**Solution**:
- Review rsyslog configuration for unnecessary stats
- Consider filtering metrics at Prometheus level
- Reduce Prometheus scrape frequency to lower parsing rate
## Advanced Usage
### Custom Metric Filtering
To filter specific metrics, modify the parser functions in `rsyslog_exporter.py`:
```python
def parse_json_impstats(file_path: str) -> List[Metric]:
# ... existing code ...
# Skip certain origins
if origin in ("debug", "internal"):
continue
# Only export specific metrics
allowed_metrics = {"processed", "failed", "submitted"}
if key not in allowed_metrics:
continue
```
### Multiple Impstats Files
Run multiple exporter instances on different ports:
```bash
# Exporter 1
IMPSTATS_PATH=/var/log/rsyslog1/impstats.json LISTEN_PORT=9898 python3 rsyslog_exporter.py &
# Exporter 2
IMPSTATS_PATH=/var/log/rsyslog2/impstats.json LISTEN_PORT=9899 python3 rsyslog_exporter.py &
```
## Future Enhancements (Go Version)
A Go-based version is planned for future development with these advantages:
- **Smaller footprint**: ~5-10 MB binary vs ~30-50 MB Python
- **Better performance**: Lower latency, lower CPU usage
- **Static binary**: No runtime dependencies
- **Easier distribution**: Single binary for all platforms
The Python version will remain supported and shares the same configuration contract, ensuring seamless migration when the Go version becomes available.
## Contributing
Contributions are welcome! Please follow the rsyslog project guidelines in `CONTRIBUTING.md`.
## License
This sidecar is part of the rsyslog project and follows the same licensing terms. See `COPYING` in the repository root.
## Support
- **Issues**: https://github.com/rsyslog/rsyslog/issues
- **Discussions**: https://github.com/rsyslog/rsyslog/discussions
- **Documentation**: https://www.rsyslog.com/doc/
## See Also
- [rsyslog impstats module documentation](https://www.rsyslog.com/doc/master/configuration/modules/impstats.html)
- [Prometheus exposition formats](https://prometheus.io/docs/instrumenting/exposition_formats/)
- [Grafana dashboards for rsyslog](https://grafana.com/grafana/dashboards/)

View File

@ -0,0 +1,63 @@
version: '3.8'
services:
# rsyslog service configured to send impstats via UDP
rsyslog:
image: rsyslog/rsyslog_dev_base_ubuntu:24.04
container_name: rsyslog
volumes:
# Mount rsyslog config with UDP impstats
- ./examples/rsyslog-udp.conf:/etc/rsyslog.conf:ro
ports:
- "514:514/tcp"
- "514:514/udp"
restart: unless-stopped
networks:
- rsyslog-net
# Prometheus exporter sidecar (UDP listener mode)
rsyslog-exporter:
build: .
container_name: rsyslog-exporter
environment:
IMPSTATS_MODE: udp
IMPSTATS_FORMAT: json
IMPSTATS_UDP_ADDR: 0.0.0.0
IMPSTATS_UDP_PORT: 19090
STATS_COMPLETE_TIMEOUT: "5"
LISTEN_ADDR: 0.0.0.0
LISTEN_PORT: 9898
LOG_LEVEL: INFO
# Optional: restrict UDP sources (comma-separated IPs)
# ALLOWED_UDP_SOURCES: "127.0.0.1,10.0.1.50"
# Note: UDP mode requires a single gunicorn worker (default in image).
# Increase workers only for file mode.
ports:
- "9898:9898" # Prometheus metrics endpoint
- "19090:19090/udp" # Receive impstats from rsyslog
restart: unless-stopped
networks:
- rsyslog-net
depends_on:
- rsyslog
# Optional: Local Prometheus instance for testing
prometheus:
image: prom/prometheus:latest
container_name: prometheus
volumes:
- ./examples/prometheus.yml:/etc/prometheus/prometheus.yml:ro
ports:
- "9090:9090"
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
restart: unless-stopped
networks:
- rsyslog-net
depends_on:
- rsyslog-exporter
networks:
rsyslog-net:
driver: bridge

View File

@ -0,0 +1,55 @@
version: '3.8'
services:
# rsyslog service (example configuration)
rsyslog:
image: rsyslog/rsyslog_dev_base_ubuntu:24.04
container_name: rsyslog
volumes:
# Mount shared volume for impstats
- impstats:/var/log/rsyslog
# Mount rsyslog config (customize as needed)
- ./examples/rsyslog.conf:/etc/rsyslog.conf:ro
ports:
- "514:514/tcp"
- "514:514/udp"
restart: unless-stopped
# Prometheus exporter sidecar
rsyslog-exporter:
build: .
container_name: rsyslog-exporter
volumes:
# Share impstats volume with rsyslog
- impstats:/var/log/rsyslog:ro
environment:
IMPSTATS_MODE: file
IMPSTATS_PATH: /var/log/rsyslog/impstats.json
IMPSTATS_FORMAT: json
LISTEN_ADDR: 0.0.0.0
LISTEN_PORT: 9898
LOG_LEVEL: INFO
ports:
- "9898:9898"
restart: unless-stopped
depends_on:
- rsyslog
# Optional: Local Prometheus instance for testing
prometheus:
image: prom/prometheus:latest
container_name: prometheus
volumes:
- ./examples/prometheus.yml:/etc/prometheus/prometheus.yml:ro
ports:
- "9090:9090"
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
restart: unless-stopped
depends_on:
- rsyslog-exporter
volumes:
impstats:
driver: local

View File

@ -0,0 +1,27 @@
# Prometheus configuration for scraping rsyslog exporter
#
# Place this in /etc/prometheus/prometheus.yml or include it
global:
scrape_interval: 30s
evaluation_interval: 30s
scrape_configs:
# rsyslog exporter sidecar
- job_name: 'rsyslog'
static_configs:
- targets: ['rsyslog-exporter:9898']
labels:
instance: 'rsyslog-main'
environment: 'production'
# For bare-metal deployment, use hostname/IP
# - targets: ['localhost:9898']
# If running multiple rsyslog instances
# - job_name: 'rsyslog-cluster'
# static_configs:
# - targets:
# - 'rsyslog-node1:9898'
# - 'rsyslog-node2:9898'
# - 'rsyslog-node3:9898'

View File

@ -0,0 +1,3 @@
@cee:{"name":"global","origin":"core","utime":"123456","stime":"78901","maxrss":"50000"}
@cee:{"name":"main queue","origin":"core.queue","size":"0","enqueued":"5000","full":"0","discarded.full":"0"}
@cee:{"name":"action 0","origin":"core.action","processed":"4950","failed":"5","suspended":"0"}

View File

@ -0,0 +1,5 @@
{"name":"global","origin":"core","utime":"123456","stime":"78901","maxrss":"50000","minflt":"1000","majflt":"10"}
{"name":"main queue","origin":"core.queue","size":"0","enqueued":"5000","full":"0","discarded.full":"0","discarded.nf":"0","maxqsize":"10000"}
{"name":"action 0","origin":"core.action","processed":"4950","failed":"5","suspended":"0","suspended.duration":"0","resumed":"0"}
{"name":"tcp","origin":"imtcp","submitted":"5000","bytes.rcvd":"500000","disallowed":"0"}
{"name":"resource-usage","origin":"impstats","utime":"100000","stime":"50000","maxrss":"75000","minflt":"2000","majflt":"5"}

View File

@ -0,0 +1,12 @@
# TYPE rsyslog_global_utime counter
rsyslog_global_utime{origin="core"} 123456
# TYPE rsyslog_global_stime counter
rsyslog_global_stime{origin="core"} 78901
# TYPE rsyslog_queue_size gauge
rsyslog_queue_size{name="main queue",origin="core.queue"} 0
# TYPE rsyslog_queue_enqueued counter
rsyslog_queue_enqueued{name="main queue",origin="core.queue"} 5000
# TYPE rsyslog_action_processed counter
rsyslog_action_processed{name="action 0",origin="core.action"} 4950
# TYPE rsyslog_action_failed counter
rsyslog_action_failed{name="action 0",origin="core.action"} 5

278
sidecar/install-service.sh Executable file
View File

@ -0,0 +1,278 @@
#!/usr/bin/env bash
## install-service.sh: Install rsyslog exporter as a systemd service.
##
## Usage:
## sudo ./install-service.sh [--prefix /opt/rsyslog-exporter] \
## [--user rsyslog] [--group rsyslog] \
## [--listen-addr 127.0.0.1] [--listen-port 9898] \
## [--select-listen-addr] \
## [--overwrite] \
## [--impstats-mode udp] [--impstats-format json] \
## [--impstats-udp-addr 127.0.0.1] [--impstats-udp-port 19090]
##
## This script:
## - installs the exporter into a fixed prefix
## - installs Python deps into a venv under the prefix
## - installs a systemd unit file and enables the service
set -euo pipefail
PREFIX="/opt/rsyslog-exporter"
RUN_USER="rsyslog"
RUN_GROUP="rsyslog"
LISTEN_ADDR="127.0.0.1"
LISTEN_PORT="9898"
LISTEN_ADDR_SET="false"
IMPSTATS_MODE="udp"
IMPSTATS_FORMAT="json"
IMPSTATS_UDP_ADDR="127.0.0.1"
IMPSTATS_UDP_PORT="19090"
IMPSTATS_UDP_ADDR_SET="false"
IMPSTATS_UDP_PORT_SET="false"
SELECT_LISTEN_ADDR="false"
OVERWRITE="false"
RSYSLOG_CONF_SOURCE=""
RSYSLOG_CONF_TARGET="/etc/rsyslog.d/10-impstats.conf"
prompt_yes_no() {
local prompt="$1"
local reply
read -rp "$prompt" reply
case "${reply:-}" in
[Yy]|[Yy][Ee][Ss]) return 0 ;;
*) return 1 ;;
esac
}
list_ipv4_addrs() {
local addrs=()
if command -v ip >/dev/null 2>&1; then
while IFS= read -r line; do
addrs+=("${line%%/*}")
done < <(ip -4 -o addr show | awk '{print $4}')
elif command -v ifconfig >/dev/null 2>&1; then
while IFS= read -r line; do
addrs+=("${line}")
done < <(ifconfig 2>/dev/null | awk '/inet / && !/inet6/ {print $2}' | sed 's/addr://')
fi
# Always include loopback and all-interfaces
addrs+=("127.0.0.1" "0.0.0.0")
# Deduplicate
printf "%s\n" "${addrs[@]}" | awk '!seen[$0]++'
}
select_listen_addr() {
if [ ! -t 0 ]; then
echo "Error: --select-listen-addr requires an interactive terminal" >&2
exit 1
fi
mapfile -t choices < <(list_ipv4_addrs)
if [ ${#choices[@]} -eq 0 ]; then
echo "Error: No IPv4 addresses found" >&2
exit 1
fi
echo "Available listen addresses:"
for i in "${!choices[@]}"; do
printf " [%d] %s\n" "$((i+1))" "${choices[$i]}"
done
echo ""
read -rp "Select listen address [1-${#choices[@]}]: " selection
if ! [[ "$selection" =~ ^[0-9]+$ ]] || [ "$selection" -lt 1 ] || [ "$selection" -gt ${#choices[@]} ]; then
echo "Error: Invalid selection" >&2
exit 1
fi
LISTEN_ADDR="${choices[$((selection-1))]}"
LISTEN_ADDR_SET="true"
}
while [[ $# -gt 0 ]]; do
case "$1" in
--prefix) PREFIX="$2"; shift 2 ;;
--user) RUN_USER="$2"; shift 2 ;;
--group) RUN_GROUP="$2"; shift 2 ;;
--listen-addr) LISTEN_ADDR="$2"; LISTEN_ADDR_SET="true"; shift 2 ;;
--listen-port) LISTEN_PORT="$2"; shift 2 ;;
--select-listen-addr) SELECT_LISTEN_ADDR="true"; shift 1 ;;
--overwrite) OVERWRITE="true"; shift 1 ;;
--impstats-mode) IMPSTATS_MODE="$2"; shift 2 ;;
--impstats-format) IMPSTATS_FORMAT="$2"; shift 2 ;;
--impstats-udp-addr) IMPSTATS_UDP_ADDR="$2"; IMPSTATS_UDP_ADDR_SET="true"; shift 2 ;;
--impstats-udp-port) IMPSTATS_UDP_PORT="$2"; IMPSTATS_UDP_PORT_SET="true"; shift 2 ;;
*) echo "Unknown argument: $1"; exit 2 ;;
esac
done
if [[ $EUID -ne 0 ]]; then
echo "This script must be run as root." >&2
exit 1
fi
if [ "$OVERWRITE" = "true" ]; then
safe_prefix="${PREFIX%/}"
if [ -z "$safe_prefix" ]; then
safe_prefix="/"
fi
case "$safe_prefix" in
/|/bin|/sbin|/usr|/etc|/var|/lib|/lib64|/boot|/dev|/proc|/sys|/run|/root|/home)
echo "Refusing to overwrite unsafe prefix: $PREFIX" >&2
exit 1
;;
esac
fi
if [ "$SELECT_LISTEN_ADDR" = "true" ]; then
select_listen_addr
fi
if [ "$IMPSTATS_UDP_ADDR_SET" = "false" ] && [ "$LISTEN_ADDR_SET" = "true" ] && [ "$LISTEN_ADDR" != "0.0.0.0" ]; then
IMPSTATS_UDP_ADDR="$LISTEN_ADDR"
fi
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
RSYSLOG_CONF_SOURCE="$SCRIPT_DIR/examples/rsyslog-impstats-udp.conf"
install -d -m 0755 "$PREFIX"
install -d -m 0755 "$PREFIX/bin"
install -d -m 0755 "$PREFIX/venv"
install -d -m 0755 /var/log/rsyslog-exporter
install -d -m 0755 /run/rsyslog-exporter
if [ -t 0 ] && [ -f "$RSYSLOG_CONF_SOURCE" ]; then
echo "Optional rsyslog impstats config deployment:"
echo " Source: $RSYSLOG_CONF_SOURCE"
echo " Target: $RSYSLOG_CONF_TARGET"
if [ "$IMPSTATS_MODE" != "udp" ]; then
echo " Note: The bundled config is for UDP mode."
fi
rsyslog_target_addr="$IMPSTATS_UDP_ADDR"
if [ "$IMPSTATS_UDP_ADDR_SET" = "false" ] && [ "$LISTEN_ADDR_SET" = "true" ] && [ "$LISTEN_ADDR" != "0.0.0.0" ]; then
rsyslog_target_addr="$LISTEN_ADDR"
fi
rsyslog_target_port="$IMPSTATS_UDP_PORT"
echo " Using target ${rsyslog_target_addr}:${rsyslog_target_port}"
if prompt_yes_no "Deploy this config to $RSYSLOG_CONF_TARGET? [y/N]: "; then
rsyslog_conf_tmp="$(mktemp)"
sed -e "s/target=\"[^\"]*\"/target=\"${rsyslog_target_addr}\"/" \
-e "s/port=\"[^\"]*\"/port=\"${rsyslog_target_port}\"/" \
"$RSYSLOG_CONF_SOURCE" > "$rsyslog_conf_tmp"
if [ -e "$RSYSLOG_CONF_TARGET" ]; then
if prompt_yes_no "File exists. Overwrite $RSYSLOG_CONF_TARGET? [y/N]: "; then
install -m 0644 "$rsyslog_conf_tmp" "$RSYSLOG_CONF_TARGET"
else
echo "Skipping rsyslog config deployment."
fi
else
install -m 0644 "$rsyslog_conf_tmp" "$RSYSLOG_CONF_TARGET"
fi
rm -f "$rsyslog_conf_tmp"
else
echo "Skipping rsyslog config deployment."
fi
else
echo "Skipping rsyslog config deployment (non-interactive or missing template)."
fi
if ! id -u "$RUN_USER" >/dev/null 2>&1; then
useradd --system --no-create-home --shell /usr/sbin/nologin "$RUN_USER"
fi
if ! getent group "$RUN_GROUP" >/dev/null 2>&1; then
groupadd --system "$RUN_GROUP"
fi
chown -R "$RUN_USER:$RUN_GROUP" /var/log/rsyslog-exporter /run/rsyslog-exporter
if [ "$OVERWRITE" = "true" ]; then
if systemctl is-active --quiet rsyslog-exporter; then
systemctl stop rsyslog-exporter
fi
if command -v rsync >/dev/null 2>&1; then
rsync -a --delete --exclude 'venv' --exclude '.git' "$SCRIPT_DIR/" "$PREFIX/"
else
find "${PREFIX:?}" -mindepth 1 -maxdepth 1 -exec rm -rf -- {} +
cp -a "$SCRIPT_DIR"/* "$PREFIX/"
fi
else
cp -a "$SCRIPT_DIR"/* "$PREFIX/"
fi
if [[ ! -x "$PREFIX/venv/bin/python" ]]; then
python3 -m venv "$PREFIX/venv"
fi
if [[ ! -x "$PREFIX/venv/bin/python" ]]; then
echo "Failed to create venv at $PREFIX/venv" >&2
exit 1
fi
"$PREFIX/venv/bin/python" -m ensurepip --upgrade
"$PREFIX/venv/bin/python" -m pip install --upgrade pip
"$PREFIX/venv/bin/python" -m pip install -r "$PREFIX/requirements.txt"
cat > /etc/systemd/system/rsyslog-exporter.service <<EOF
[Unit]
Description=rsyslog Prometheus Exporter
Documentation=https://github.com/rsyslog/rsyslog
After=network.target rsyslog.service
Wants=rsyslog.service
[Service]
Type=simple
User=${RUN_USER}
Group=${RUN_GROUP}
WorkingDirectory=${PREFIX}
ExecStart=/bin/sh -c "\\
${PREFIX}/venv/bin/python -m gunicorn \\
--bind \${LISTEN_ADDR}:\${LISTEN_PORT} \\
--workers 1 \\
--threads 2 \\
--timeout 30 \\
--access-logfile /var/log/rsyslog-exporter/access.log \\
--error-logfile /var/log/rsyslog-exporter/error.log \\
--log-level info \\
--pid /run/rsyslog-exporter/rsyslog-exporter.pid \\
rsyslog_exporter:application"
ExecReload=/bin/kill -s HUP \$MAINPID
Environment="IMPSTATS_MODE=${IMPSTATS_MODE}"
Environment="IMPSTATS_FORMAT=${IMPSTATS_FORMAT}"
Environment="IMPSTATS_UDP_ADDR=${IMPSTATS_UDP_ADDR}"
Environment="IMPSTATS_UDP_PORT=${IMPSTATS_UDP_PORT}"
Environment="LISTEN_ADDR=${LISTEN_ADDR}"
Environment="LISTEN_PORT=${LISTEN_PORT}"
Environment="LOG_LEVEL=INFO"
Environment="MAX_BURST_BUFFER_LINES=10000"
Environment="ALLOWED_UDP_SOURCES=${ALLOWED_UDP_SOURCES:-}"
Restart=on-failure
RestartSec=5s
KillMode=mixed
KillSignal=SIGTERM
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/log/rsyslog-exporter
RuntimeDirectory=rsyslog-exporter
MemoryMax=256M
TasksMax=50
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable rsyslog-exporter
systemctl restart rsyslog-exporter
systemctl status rsyslog-exporter --no-pager
echo "Installed and started rsyslog-exporter.service"

10
sidecar/requirements.txt Normal file
View File

@ -0,0 +1,10 @@
# rsyslog Prometheus Exporter Dependencies
# Prometheus client library for Python
prometheus-client>=0.19.0
# WSGI utilities for HTTP server
Werkzeug>=3.0.0
# Production WSGI server (recommended for production deployments)
gunicorn>=21.2.0

View File

@ -0,0 +1,81 @@
# systemd service file for rsyslog Prometheus Exporter
#
# Installation:
# 1. Copy this file to /etc/systemd/system/rsyslog-exporter.service
# 2. Create log directory: sudo mkdir -p /var/log/rsyslog-exporter
# 3. Adjust paths and environment variables as needed
# 4. Enable and start: sudo systemctl enable --now rsyslog-exporter
[Unit]
Description=rsyslog Prometheus Exporter
Documentation=https://github.com/rsyslog/rsyslog
After=network.target rsyslog.service
Wants=rsyslog.service
[Service]
# Gunicorn does not emit sd_notify; use simple for portability
Type=simple
User=rsyslog
Group=rsyslog
WorkingDirectory=/opt/rsyslog-exporter
# Production WSGI server with gunicorn (REQUIRED for production)
# UDP mode requires a single worker. File mode may increase workers.
ExecStart=/bin/sh -c "\
/opt/rsyslog-exporter/venv/bin/python -m gunicorn \
--bind ${LISTEN_ADDR}:${LISTEN_PORT} \
--workers 1 \
--threads 2 \
--timeout 30 \
--access-logfile /var/log/rsyslog-exporter/access.log \
--error-logfile /var/log/rsyslog-exporter/error.log \
--log-level info \
--pid /run/rsyslog-exporter/rsyslog-exporter.pid \
rsyslog_exporter:application"
# Graceful reload on configuration changes
ExecReload=/bin/kill -s HUP $MAINPID
# Environment variables for configuration
# Input mode: 'file' or 'udp' (udp recommended for production)
Environment="IMPSTATS_MODE=udp"
Environment="IMPSTATS_FORMAT=json"
Environment="IMPSTATS_UDP_ADDR=127.0.0.1"
Environment="IMPSTATS_UDP_PORT=19090"
# HTTP endpoint binding (use 127.0.0.1 for localhost-only access)
Environment="LISTEN_ADDR=127.0.0.1"
Environment="LISTEN_PORT=9898"
# Logging
Environment="LOG_LEVEL=INFO"
# Security limits
Environment="MAX_BURST_BUFFER_LINES=10000"
Environment="ALLOWED_UDP_SOURCES="
# NOTE: For file mode (not recommended), uncomment:
# Environment="IMPSTATS_MODE=file"
# Environment="IMPSTATS_PATH=/var/log/rsyslog/impstats.json"
# Restart policy
Restart=on-failure
RestartSec=5s
KillMode=mixed
KillSignal=SIGTERM
# Security hardening
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/log/rsyslog-exporter
RuntimeDirectory=rsyslog-exporter
# Resource limits
MemoryMax=256M
TasksMax=50
[Install]
WantedBy=multi-user.target

919
sidecar/rsyslog_exporter.py Normal file
View File

@ -0,0 +1,919 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
#
# Copyright 2025-2026 Rainer Gerhards and Adiscon GmbH.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
rsyslog Prometheus Exporter
A lightweight sidecar process that reads rsyslog impstats files and exposes
metrics in Prometheus format via an HTTP endpoint.
Supports multiple impstats formats:
- json: Standard JSON format
- prometheus: Native Prometheus exposition format (rsyslog 8.2312.0+)
- cee: CEE/Lumberjack format with @cee cookie
Configuration via environment variables:
- IMPSTATS_MODE: Input mode: 'file' or 'udp' (default: udp)
- IMPSTATS_PATH: Path to impstats file when mode=file (default: /var/log/rsyslog/impstats.json)
- IMPSTATS_FORMAT: Format of impstats (json, prometheus, cee; default: json)
- IMPSTATS_UDP_PORT: UDP port to listen on when mode=udp (default: 19090)
- IMPSTATS_UDP_ADDR: UDP bind address when mode=udp (default: 127.0.0.1 - loopback only)
- STATS_COMPLETE_TIMEOUT: Seconds to wait for burst completion in UDP mode (default: 5)
- LISTEN_ADDR: Address to bind HTTP server (default: 127.0.0.1 - loopback only for security)
- LISTEN_PORT: Port for HTTP server (default: 9898)
- LOG_LEVEL: Logging level (DEBUG, INFO, WARNING, ERROR; default: INFO)
Security configuration (UDP mode):
- ALLOWED_UDP_SOURCES: Comma-separated list of allowed source IPs (default: empty = allow all)
- MAX_UDP_MESSAGE_SIZE: Maximum UDP packet size in bytes (default: 65535)
- MAX_BURST_BUFFER_LINES: Maximum lines in burst buffer (default: 10000, prevents DoS)
Security Notes:
- IMPSTATS_UDP_ADDR: Defaults to 127.0.0.1 (loopback). Use this for same-host rsyslog.
- LISTEN_ADDR: Defaults to 127.0.0.1 (loopback) for security. Set to:
- 127.0.0.1: Local Prometheus only (most secure)
- Specific IP: Bind to VPN/internal network interface
- 0.0.0.0: All interfaces (use with firewall rules)
- ALLOWED_UDP_SOURCES: Enable if UDP listener must bind to 0.0.0.0 (e.g., containers)
"""
import json
import logging
import os
import re
import socket
import sys
import threading
import time
from collections import defaultdict
from typing import Dict, List, Optional
from prometheus_client import generate_latest
from prometheus_client.core import (
CollectorRegistry,
CounterMetricFamily,
GaugeMetricFamily,
)
from werkzeug.serving import run_simple
from werkzeug.wrappers import Request, Response
def _int_env(name: str, default: str) -> int:
value = os.getenv(name, default)
try:
return int(value)
except ValueError as exc:
print(
f"FATAL: {name} must be an integer, got '{value}'.",
file=sys.stderr,
)
raise SystemExit(1) from exc
def _float_env(name: str, default: str) -> float:
value = os.getenv(name, default)
try:
return float(value)
except ValueError as exc:
print(
f"FATAL: {name} must be a number, got '{value}'.",
file=sys.stderr,
)
raise SystemExit(1) from exc
# Configuration from environment
IMPSTATS_MODE = os.getenv("IMPSTATS_MODE", "udp").lower()
IMPSTATS_PATH = os.getenv("IMPSTATS_PATH", "/var/log/rsyslog/impstats.json")
IMPSTATS_FORMAT = os.getenv("IMPSTATS_FORMAT", "json").lower()
IMPSTATS_UDP_PORT = _int_env("IMPSTATS_UDP_PORT", "19090")
IMPSTATS_UDP_ADDR = os.getenv("IMPSTATS_UDP_ADDR", "127.0.0.1")
STATS_COMPLETE_TIMEOUT = _float_env("STATS_COMPLETE_TIMEOUT", "5")
LISTEN_ADDR = os.getenv("LISTEN_ADDR", "127.0.0.1") # Changed default to loopback for security
LISTEN_PORT = _int_env("LISTEN_PORT", "9898")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
# Security limits
MAX_UDP_MESSAGE_SIZE = _int_env("MAX_UDP_MESSAGE_SIZE", "65535") # Max UDP packet size
MAX_BURST_BUFFER_LINES = _int_env("MAX_BURST_BUFFER_LINES", "10000") # Prevent memory exhaustion
ALLOWED_UDP_SOURCES = os.getenv("ALLOWED_UDP_SOURCES", "") # Comma-separated IPs, empty = allow all
# Logging setup
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class Metric:
"""Internal representation of a metric."""
def __init__(self, name: str, value: float, labels: Dict[str, str], metric_type: str = "gauge"):
self.name = name
self.value = value
self.labels = labels
self.metric_type = metric_type # "gauge" or "counter"
def __repr__(self):
return f"Metric({self.name}={self.value}, labels={self.labels}, type={self.metric_type})"
def sanitize_metric_name(name: str) -> str:
"""
Sanitize metric name to conform to Prometheus naming conventions.
- Replace invalid characters with underscores
- Ensure it starts with a letter or underscore
- Convert to lowercase
"""
# Replace invalid characters
name = re.sub(r'[^a-zA-Z0-9_]', '_', name)
# Ensure starts with letter or underscore
if name and not re.match(r'^[a-zA-Z_]', name):
name = '_' + name
return name.lower()
def sanitize_label_name(name: str) -> str:
"""Sanitize label name for Prometheus."""
name = re.sub(r'[^a-zA-Z0-9_]', '_', name)
if name and not re.match(r'^[a-zA-Z_]', name):
name = '_' + name
return name
COUNTER_KEYS = {
"processed", "failed", "submitted", "utime", "stime", "resumed",
"enqueued", "discarded.full", "discarded.nf", "bytes.rcvd", "bytes.sent",
}
COUNTER_PREFIXES = ("called.",)
COUNTER_SUFFIXES = (".rcvd", ".sent", ".enqueued")
def build_base_labels(origin: str, name: str) -> Dict[str, str]:
return {
"rsyslog_component": origin,
"rsyslog_resource": sanitize_label_name(name),
}
def build_metric_name(origin: str, key: str) -> str:
return f"rsyslog_{sanitize_metric_name(origin)}_{sanitize_metric_name(key)}"
def is_counter_key(key: str) -> bool:
return (
key in COUNTER_KEYS or
key.startswith(COUNTER_PREFIXES) or
key.endswith(COUNTER_SUFFIXES)
)
def parse_numeric_value(value) -> Optional[float]:
try:
return float(value)
except (ValueError, TypeError):
return None
def parse_json_object(obj: Dict[str, object]) -> List[Metric]:
metrics: List[Metric] = []
name = obj.get("name", "unknown")
origin = obj.get("origin", "unknown")
base_labels = build_base_labels(origin, name)
for key, value in obj.items():
if key in ("name", "origin"):
continue
numeric_value = parse_numeric_value(value)
if numeric_value is None:
continue
metric_type = "counter" if is_counter_key(key) else "gauge"
metric_name = build_metric_name(origin, key)
metrics.append(Metric(
name=metric_name,
value=numeric_value,
labels=base_labels.copy(),
metric_type=metric_type,
))
return metrics
def parse_json_impstats(file_path: str) -> List[Metric]:
"""
Parse rsyslog impstats in JSON format from a file.
Expected format (one JSON object per line):
{"name":"global","origin":"core","utime":"123456","stime":"78901",...}
{"name":"action 0","origin":"core.action","processed":"1000","failed":"5",...}
"""
metrics = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
metrics.extend(parse_json_line(line, line_num))
except FileNotFoundError:
logger.error(f"Impstats file not found: {file_path}")
except Exception as e:
logger.error(f"Error parsing JSON impstats: {e}", exc_info=True)
return metrics
def parse_json_line(line: str, line_num: int = 0) -> List[Metric]:
"""
Parse a single JSON line from impstats into metrics.
Can be used by both file and UDP parsers.
"""
try:
obj = json.loads(line)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON at line {line_num}: {e}")
return []
if not isinstance(obj, dict):
logger.warning(f"Unexpected JSON structure at line {line_num}: {type(obj).__name__}")
return []
return parse_json_object(obj)
def parse_json_lines(lines: List[str]) -> List[Metric]:
"""Parse multiple JSON lines (e.g., from UDP burst)."""
metrics = []
for i, line in enumerate(lines):
line = line.strip()
if line:
metrics.extend(parse_json_line(line, i + 1))
return metrics
def parse_prometheus_impstats(file_path: str) -> List[Metric]:
"""
Parse rsyslog impstats in native Prometheus format.
This format is already Prometheus exposition format, so we parse it
and convert to our internal representation for consistency.
Example format:
# TYPE rsyslog_global_utime counter
rsyslog_global_utime{origin="core"} 123456
"""
metrics = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
current_type = "gauge"
for line in f:
line = line.strip()
# Skip empty lines and help comments
if not line or line.startswith("# HELP"):
continue
# Parse TYPE directive
if line.startswith("# TYPE"):
parts = line.split()
if len(parts) >= 4:
current_type = parts[3] # gauge, counter, etc.
continue
# Skip other comments
if line.startswith("#"):
continue
# Parse metric line: metric_name{labels} value
match = re.match(r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\s*(\{[^}]*\})?\s+([0-9.eE+\-]+)', line)
if not match:
continue
metric_name = match.group(1)
labels_str = match.group(2) or "{}"
try:
value = float(match.group(3))
except ValueError:
continue
# Parse labels
labels = {}
if labels_str != "{}":
labels_str = labels_str.strip("{}")
for label_pair in labels_str.split(","):
if "=" in label_pair:
k, v = label_pair.split("=", 1)
labels[k.strip()] = v.strip().strip('"')
metrics.append(Metric(
name=metric_name,
value=value,
labels=labels,
metric_type="counter" if current_type == "counter" else "gauge",
))
except FileNotFoundError:
logger.error(f"Impstats file not found: {file_path}")
except Exception as e:
logger.error(f"Error parsing Prometheus impstats: {e}", exc_info=True)
return metrics
def parse_cee_impstats(file_path: str) -> List[Metric]:
"""
Parse rsyslog impstats in CEE/Lumberjack format.
Format: Lines starting with "@cee:" followed by JSON.
Example: @cee:{"name":"global","origin":"core","utime":"123456"}
Falls back to plain JSON parsing if no @cee cookie found.
"""
metrics = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
# Check for @cee cookie
if line.startswith("@cee:"):
json_str = line[5:].strip()
else:
# Fallback: treat as plain JSON
json_str = line
try:
obj = json.loads(json_str)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse CEE JSON at line {line_num}: {e}")
continue
if not isinstance(obj, dict):
logger.warning(
f"Unexpected CEE JSON structure at line {line_num}: {type(obj).__name__}"
)
continue
metrics.extend(parse_json_object(obj))
except FileNotFoundError:
logger.error(f"Impstats file not found: {file_path}")
except Exception as e:
logger.error(f"Error parsing CEE impstats: {e}", exc_info=True)
return metrics
class UdpStatsListener:
"""
UDP listener for receiving impstats messages from rsyslog.
Handles burst reception with completion timeout:
- Receives all messages in a burst
- Waits STATS_COMPLETE_TIMEOUT seconds after last message
- Then updates metrics atomically
"""
def __init__(self, addr: str, port: int, format_type: str, completion_timeout: float,
allowed_sources: List[str] = None):
self.addr = addr
self.port = port
self.format_type = format_type
self.completion_timeout = completion_timeout
self.allowed_sources = allowed_sources or [] # Empty list = allow all
self.sock = None
self.running = False
self.thread = None
# Metrics storage
self.cached_metrics: List[Metric] = []
self.metrics_lock = threading.Lock()
self.parse_count = 0
# Burst handling
self.burst_buffer: List[str] = []
self.last_receive_time = 0
self.dropped_messages = 0 # Track dropped messages for security monitoring
# Select parser for lines
if format_type == "json":
self.line_parser = parse_json_lines
elif format_type == "cee":
self.line_parser = self._parse_cee_lines
else:
logger.warning(f"UDP mode does not support format '{format_type}', using json")
self.line_parser = parse_json_lines
def _parse_cee_lines(self, lines: List[str]) -> List[Metric]:
"""Parse CEE format lines."""
json_lines = []
for line in lines:
if line.startswith("@cee:"):
json_lines.append(line[5:].strip())
else:
json_lines.append(line)
return parse_json_lines(json_lines)
def start(self):
"""Start the UDP listener in a background thread."""
if self.running:
return
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.addr, self.port))
self.sock.settimeout(1.0) # 1 second timeout for receive
self.running = True
self.thread = threading.Thread(target=self._listen_loop, daemon=True)
self.thread.start()
logger.info(f"UDP listener started on {self.addr}:{self.port}")
except Exception as e:
logger.error(f"Failed to start UDP listener: {e}", exc_info=True)
raise
def stop(self):
"""Stop the UDP listener."""
self.running = False
if self.thread:
self.thread.join(timeout=5)
if self.sock:
self.sock.close()
logger.info("UDP listener stopped")
def _listen_loop(self):
"""Main listening loop - runs in background thread."""
logger.debug("UDP listener loop started")
while self.running:
try:
# Try to receive data
try:
data, addr = self.sock.recvfrom(MAX_UDP_MESSAGE_SIZE)
if data:
# Source IP filtering
source_ip = addr[0]
if self.allowed_sources and source_ip not in self.allowed_sources:
logger.warning(f"Rejected UDP packet from unauthorized source: {source_ip}")
self.dropped_messages += 1
continue
message = data.decode('utf-8', errors='ignore')
self._handle_message(message)
except socket.timeout:
# Check if burst is complete
self._check_burst_completion()
continue
except Exception as e:
if self.running:
logger.error(f"Error in UDP listener: {e}", exc_info=True)
logger.debug("UDP listener loop ended")
def _handle_message(self, message: str):
"""Handle received UDP message (may contain multiple lines)."""
lines = message.splitlines()
with self.metrics_lock:
# Prevent buffer overflow attacks
if len(self.burst_buffer) + len(lines) > MAX_BURST_BUFFER_LINES:
logger.warning(f"Burst buffer limit reached ({MAX_BURST_BUFFER_LINES} lines), "
f"dropping {len(lines)} new lines. Possible DoS attempt or misconfiguration.")
self.dropped_messages += 1
return
self.burst_buffer.extend(lines)
self.last_receive_time = time.time()
logger.debug(f"Received {len(lines)} lines, buffer now has {len(self.burst_buffer)} lines")
def _check_burst_completion(self):
"""Check if burst is complete and process if so."""
burst_lines = None
with self.metrics_lock:
if not self.burst_buffer:
return
time_since_last = time.time() - self.last_receive_time
if time_since_last < self.completion_timeout:
return
# Burst is complete, copy buffer and release lock before parsing
burst_lines = self.burst_buffer
self.burst_buffer = []
logger.debug(f"Burst complete ({len(burst_lines)} lines), processing...")
try:
new_metrics = self.line_parser(burst_lines)
with self.metrics_lock:
self.cached_metrics = new_metrics
self.parse_count += 1
logger.info(f"Updated {len(new_metrics)} metrics from UDP burst (parse #{self.parse_count})")
except Exception as e:
logger.error(f"Error parsing UDP burst: {e}", exc_info=True)
def get_metrics(self) -> List[Metric]:
"""Get current metrics (thread-safe)."""
with self.metrics_lock:
return self.cached_metrics.copy()
class ImpstatsCollector:
"""
Prometheus collector that reads and parses impstats.
Supports both file-based and UDP listener modes.
"""
def __init__(self, mode: str, file_path: str = None, format_type: str = "json",
udp_addr: str = None, udp_port: int = None, completion_timeout: float = 5):
self.mode = mode
self.format_type = format_type
self.parse_count = 0
if mode == "file":
if not file_path:
raise ValueError("file_path required for file mode")
self.file_path = file_path
self.last_mtime = 0
self.cached_metrics: List[Metric] = []
# Select parser
if format_type == "json":
self.parser = parse_json_impstats
elif format_type == "prometheus":
self.parser = parse_prometheus_impstats
elif format_type == "cee":
self.parser = parse_cee_impstats
else:
logger.warning(f"Unknown format: {format_type}, defaulting to json")
self.parser = parse_json_impstats
self.udp_listener = None
elif mode == "udp":
if not udp_addr or not udp_port:
raise ValueError("udp_addr and udp_port required for UDP mode")
# Parse allowed sources
allowed_sources = []
if ALLOWED_UDP_SOURCES:
allowed_sources = [ip.strip() for ip in ALLOWED_UDP_SOURCES.split(',') if ip.strip()]
logger.info(f"UDP source filtering enabled: {allowed_sources}")
self.udp_listener = UdpStatsListener(udp_addr, udp_port, format_type,
completion_timeout, allowed_sources)
self.udp_listener.start()
self.file_path = None
self.cached_metrics = []
else:
raise ValueError(f"Unknown mode: {mode}, must be 'file' or 'udp'")
def refresh_if_needed(self):
"""Check file mtime and refresh cache if file has changed (file mode only)."""
if self.mode != "file":
return
try:
current_mtime = os.path.getmtime(self.file_path)
if current_mtime != self.last_mtime:
logger.debug(f"File modified, refreshing metrics (mtime: {current_mtime})")
self.cached_metrics = self.parser(self.file_path)
self.last_mtime = current_mtime
self.parse_count += 1
logger.info(f"Loaded {len(self.cached_metrics)} metrics from {self.file_path} (parse #{self.parse_count})")
except FileNotFoundError:
if self.cached_metrics:
logger.warning(f"Impstats file disappeared: {self.file_path}")
self.cached_metrics = []
except Exception as e:
logger.error(f"Error checking file mtime: {e}", exc_info=True)
def get_current_metrics(self) -> List[Metric]:
"""Get current metrics based on mode."""
if self.mode == "file":
self.refresh_if_needed()
return self.cached_metrics
elif self.mode == "udp":
return self.udp_listener.get_metrics()
return []
def get_parse_count(self) -> int:
"""Get total number of times metrics were parsed/updated."""
if self.mode == "file":
return self.parse_count
elif self.mode == "udp":
return self.udp_listener.parse_count
return 0
def collect(self):
"""
Called by Prometheus client library on each scrape.
Returns metric families.
"""
current_metrics = self.get_current_metrics()
# Group metrics by name
metric_groups: Dict[str, List[Metric]] = defaultdict(list)
for metric in current_metrics:
metric_groups[metric.name].append(metric)
# Emit metric families
for name, metrics in metric_groups.items():
if not metrics:
continue
# Determine type from first metric
metric_type = metrics[0].metric_type
if metric_type == "counter":
family = CounterMetricFamily(
name,
f"rsyslog metric {name}",
labels=list(metrics[0].labels.keys()) if metrics[0].labels else None,
)
else:
family = GaugeMetricFamily(
name,
f"rsyslog metric {name}",
labels=list(metrics[0].labels.keys()) if metrics[0].labels else None,
)
for metric in metrics:
if metric.labels:
family.add_metric(
labels=list(metric.labels.values()),
value=metric.value,
)
else:
family.add_metric([], metric.value)
yield family
class ExporterApp:
"""WSGI application for the Prometheus exporter."""
def __init__(self, collector: ImpstatsCollector):
self.collector = collector
self.registry = CollectorRegistry()
self.registry.register(collector)
self.start_time = time.time()
def __call__(self, environ, start_response):
request = Request(environ)
if request.path == "/metrics":
# Check if we have any metrics to export
current_metrics = self.collector.get_current_metrics()
if not current_metrics:
# Best practice: return 503 with explanatory comment when no metrics available
# This allows Prometheus to distinguish between "no data" and "service down"
error_message = (
"# No metrics available\n"
"# The rsyslog exporter has not yet collected any statistics.\n"
)
if self.collector.mode == "file":
if not os.path.exists(self.collector.file_path):
error_message += f"# Reason: impstats file does not exist: {self.collector.file_path}\n"
else:
error_message += f"# Reason: impstats file is empty or contains no valid metrics\n"
elif self.collector.mode == "udp":
error_message += "# Reason: No statistics received via UDP yet. Waiting for rsyslog to send data.\n"
response = Response(
error_message,
status=503,
mimetype="text/plain; version=0.0.4"
)
else:
# Generate Prometheus exposition format
output = generate_latest(self.registry)
response = Response(output, mimetype="text/plain; version=0.0.4")
elif request.path == "/health" or request.path == "/":
# Health check endpoint
uptime = time.time() - self.start_time
current_metrics = self.collector.get_current_metrics()
# Derive simple health status
status = "healthy"
try:
if self.collector.mode == "file":
# degraded if file missing or metrics are empty after a parse
if not os.path.exists(self.collector.file_path):
status = "degraded"
if len(current_metrics) == 0 and self.collector.get_parse_count() > 0:
status = "degraded"
elif self.collector.mode == "udp":
if getattr(self.collector.udp_listener, "dropped_messages", 0) > 0:
status = "degraded"
except Exception:
status = "degraded"
health_info = {
"status": status,
"uptime_seconds": round(uptime, 2),
"mode": self.collector.mode,
"metrics_count": len(current_metrics),
"parse_count": self.collector.get_parse_count(),
}
if self.collector.mode == "file":
health_info["impstats_file"] = self.collector.file_path
health_info["impstats_format"] = self.collector.format_type
elif self.collector.mode == "udp":
health_info["udp_addr"] = self.collector.udp_listener.addr
health_info["udp_port"] = self.collector.udp_listener.port
health_info["impstats_format"] = self.collector.format_type
health_info["dropped_messages"] = self.collector.udp_listener.dropped_messages
if self.collector.udp_listener.allowed_sources:
health_info["source_filtering"] = "enabled"
response = Response(
json.dumps(health_info, indent=2) + "\n",
mimetype="application/json",
)
else:
response = Response("Not Found\n", status=404)
return response(environ, start_response)
# Global collector instance for WSGI application
_collector = None
def create_app():
"""WSGI application factory for production servers (gunicorn, uwsgi, etc.)."""
global _collector
# Validate configuration
if IMPSTATS_MODE not in ("file", "udp"):
logger.error(f"Invalid IMPSTATS_MODE: {IMPSTATS_MODE}")
sys.exit(1)
if IMPSTATS_FORMAT not in ("json", "prometheus", "cee"):
logger.error(f"Invalid IMPSTATS_FORMAT: {IMPSTATS_FORMAT}")
sys.exit(1)
if not (1 <= LISTEN_PORT <= 65535):
logger.error(f"Invalid LISTEN_PORT: {LISTEN_PORT}")
sys.exit(1)
if IMPSTATS_MODE == "udp" and not (1 <= IMPSTATS_UDP_PORT <= 65535):
logger.error(f"Invalid IMPSTATS_UDP_PORT: {IMPSTATS_UDP_PORT}")
sys.exit(1)
# Enforce single worker in UDP mode to avoid socket binding clashes with gunicorn
if IMPSTATS_MODE == "udp":
detected_workers = 1
# common envs: WEB_CONCURRENCY, GUNICORN_WORKERS, WORKERS
for env_key in ("WEB_CONCURRENCY", "GUNICORN_WORKERS", "WORKERS"):
val = os.getenv(env_key)
if val:
try:
detected_workers = int(val)
break
except ValueError:
pass
if detected_workers == 1:
# parse from GUNICORN_CMD_ARGS like "--workers 4" or "-w 4"
cmd_args = os.getenv("GUNICORN_CMD_ARGS", "")
m = re.search(r"--workers\s+(\d+)", cmd_args)
if not m:
m = re.search(r"(?:\s|^)\-w\s+(\d+)(?:\s|$)", cmd_args)
if m:
try:
detected_workers = int(m.group(1))
except ValueError:
detected_workers = 1
if detected_workers > 1 and os.getenv("RSYSLOG_EXPORTER_ALLOW_MULTI_WORKER_UDP", "0") != "1":
logger.error(
"UDP mode requires a single worker to avoid socket clashes. Detected workers=%d. "
"Set --workers 1 or export RSYSLOG_EXPORTER_ALLOW_MULTI_WORKER_UDP=1 to override (not recommended).",
detected_workers,
)
sys.exit(1)
logger.info("=" * 60)
logger.info("rsyslog Prometheus Exporter initializing")
logger.info("=" * 60)
logger.info(f"Mode: {IMPSTATS_MODE}")
logger.info(f"Impstats format: {IMPSTATS_FORMAT}")
logger.info(f"HTTP bind: {LISTEN_ADDR}:{LISTEN_PORT}")
logger.info(f"Log level: {LOG_LEVEL}")
if IMPSTATS_MODE == "file":
logger.info(f"File path: {IMPSTATS_PATH}")
# Validate impstats file exists
if not os.path.exists(IMPSTATS_PATH):
logger.warning(f"Impstats file does not exist yet: {IMPSTATS_PATH}")
logger.warning("Exporter will start but metrics will be empty until file is created")
# Create collector in file mode
_collector = ImpstatsCollector(
mode="file",
file_path=IMPSTATS_PATH,
format_type=IMPSTATS_FORMAT
)
# Do initial load
try:
_collector.refresh_if_needed()
logger.info(f"Initial load: {len(_collector.cached_metrics)} metrics")
except Exception as e:
logger.error(f"Initial load failed: {e}")
elif IMPSTATS_MODE == "udp":
logger.info(f"UDP listen: {IMPSTATS_UDP_ADDR}:{IMPSTATS_UDP_PORT}")
logger.info(f"Burst completion timeout: {STATS_COMPLETE_TIMEOUT}s")
# Create collector in UDP mode
_collector = ImpstatsCollector(
mode="udp",
format_type=IMPSTATS_FORMAT,
udp_addr=IMPSTATS_UDP_ADDR,
udp_port=IMPSTATS_UDP_PORT,
completion_timeout=STATS_COMPLETE_TIMEOUT
)
logger.info("UDP listener started, waiting for stats from rsyslog...")
else:
logger.error(f"Invalid IMPSTATS_MODE: {IMPSTATS_MODE}, must be 'file' or 'udp'")
sys.exit(1)
# Create WSGI app
app = ExporterApp(_collector)
logger.info("=" * 60)
logger.info("Application initialized successfully")
logger.info("=" * 60)
return app
def main():
"""Development server entry point (uses Werkzeug - not for production!)."""
logger.warning("*" * 60)
logger.warning("DEVELOPMENT MODE - Not suitable for production!")
logger.warning("For production, use: gunicorn --workers 1 -b %s:%d rsyslog_exporter:application", LISTEN_ADDR, LISTEN_PORT)
logger.warning("*" * 60)
logger.info(f"Starting development server at http://{LISTEN_ADDR}:{LISTEN_PORT}/metrics")
try:
run_simple(
LISTEN_ADDR,
LISTEN_PORT,
application,
use_reloader=False,
use_debugger=False,
threaded=True,
)
except KeyboardInterrupt:
logger.info("Received shutdown signal, exiting")
except Exception as e:
logger.error(f"Server error: {e}", exc_info=True)
sys.exit(1)
# WSGI application instance for production servers (gunicorn, uwsgi, etc.)
# Created at module import time so it's available for both gunicorn and direct execution
application = create_app()
if __name__ == "__main__":
main()

29
sidecar/start-file.sh Executable file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
#
## Start rsyslog Prometheus exporter (file mode)
##
## Uses venv if present, otherwise falls back to system python.
## You can override any env var by exporting it before running this script.
#
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
PYTHON="python3"
if [[ -x "$SCRIPT_DIR/.venv/bin/python" ]]; then
PYTHON="$SCRIPT_DIR/.venv/bin/python"
elif [[ -x "$SCRIPT_DIR/venv/bin/python3" ]]; then
PYTHON="$SCRIPT_DIR/venv/bin/python3"
fi
export IMPSTATS_MODE="${IMPSTATS_MODE:-file}"
export IMPSTATS_FORMAT="${IMPSTATS_FORMAT:-json}"
export IMPSTATS_PATH="${IMPSTATS_PATH:-$SCRIPT_DIR/examples/sample-impstats.json}"
export LISTEN_ADDR="${LISTEN_ADDR:-127.0.0.1}"
export LISTEN_PORT="${LISTEN_PORT:-9898}"
export LOG_LEVEL="${LOG_LEVEL:-INFO}"
exec "$PYTHON" "$SCRIPT_DIR/rsyslog_exporter.py"

45
sidecar/start-udp.sh Executable file
View File

@ -0,0 +1,45 @@
#!/usr/bin/env bash
#
## Start rsyslog Prometheus exporter (UDP mode)
##
## Uses venv if present, otherwise falls back to system python.
## You can override any env var by exporting it before running this script.
#
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
PYTHON="python3"
if [[ -x "$SCRIPT_DIR/.venv/bin/python" ]]; then
PYTHON="$SCRIPT_DIR/.venv/bin/python"
elif [[ -x "$SCRIPT_DIR/venv/bin/python3" ]]; then
PYTHON="$SCRIPT_DIR/venv/bin/python3"
fi
export IMPSTATS_MODE="${IMPSTATS_MODE:-udp}"
export IMPSTATS_FORMAT="${IMPSTATS_FORMAT:-json}"
export IMPSTATS_UDP_ADDR="${IMPSTATS_UDP_ADDR:-127.0.0.1}"
export IMPSTATS_UDP_PORT="${IMPSTATS_UDP_PORT:-19090}"
export STATS_COMPLETE_TIMEOUT="${STATS_COMPLETE_TIMEOUT:-5}"
export LISTEN_ADDR="${LISTEN_ADDR:-127.0.0.1}"
export LISTEN_PORT="${LISTEN_PORT:-9898}"
# Security/DoS knobs (optional)
export MAX_UDP_MESSAGE_SIZE="${MAX_UDP_MESSAGE_SIZE:-65535}"
export MAX_BURST_BUFFER_LINES="${MAX_BURST_BUFFER_LINES:-10000}"
export ALLOWED_UDP_SOURCES="${ALLOWED_UDP_SOURCES:-}"
export LOG_LEVEL="${LOG_LEVEL:-INFO}"
exec "$PYTHON" -m gunicorn \
--bind "${LISTEN_ADDR}:${LISTEN_PORT}" \
--workers 1 \
--threads 2 \
--timeout 30 \
--access-logfile - \
--error-logfile - \
--log-level "${LOG_LEVEL,,}" \
rsyslog_exporter:application

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
## run-test-udp.sh: Run the UDP test with a local venv.
##
## Usage:
## ./tests/run-test-udp.sh
##
## This script creates .venv in the sidecar directory if missing,
## installs requirements.txt, and runs tests/test_udp.py.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SIDECAR_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)"
VENV_DIR="${SIDECAR_DIR}/.venv"
if [[ ! -x "${VENV_DIR}/bin/python" ]]; then
python3 -m venv "${VENV_DIR}"
fi
"${VENV_DIR}/bin/pip" install -r "${SIDECAR_DIR}/requirements.txt"
exec "${VENV_DIR}/bin/python" "${SCRIPT_DIR}/test_udp.py"

22
sidecar/tests/run-validate.sh Executable file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
## run-validate.sh: Run the sidecar validator with a local venv.
##
## Usage:
## ./tests/run-validate.sh
##
## This script creates .venv in the sidecar directory if missing,
## installs requirements.txt, and runs tests/validate.py.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SIDECAR_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)"
VENV_DIR="${SIDECAR_DIR}/.venv"
if [[ ! -x "${VENV_DIR}/bin/python" ]]; then
python3 -m venv "${VENV_DIR}"
fi
"${VENV_DIR}/bin/pip" install -r "${SIDECAR_DIR}/requirements.txt"
exec "${VENV_DIR}/bin/python" "${SCRIPT_DIR}/validate.py"

43
sidecar/tests/test_udp.py Normal file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env python3
"""
Test UDP mode by sending sample impstats via UDP.
"""
import os
import socket
import time
import urllib.request
from urllib.parse import urlparse
script_dir = os.path.dirname(os.path.abspath(__file__))
sidecar_dir = os.path.dirname(script_dir)
with open(os.path.join(sidecar_dir, "examples/sample-impstats.json"), "r") as f:
lines = f.readlines()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_addr = os.getenv("IMPSTATS_UDP_ADDR", "127.0.0.1")
udp_port = int(os.getenv("IMPSTATS_UDP_PORT", "19090"))
print(f"Sending {len(lines)} lines via UDP to {udp_addr}:{udp_port}...")
message = "".join(lines)
sock.sendto(message.encode("utf-8"), (udp_addr, udp_port))
print("Sent! Waiting 6 seconds for burst completion timeout...")
time.sleep(6)
print("Fetching metrics...")
try:
listen_addr = os.getenv("LISTEN_ADDR", "127.0.0.1")
if listen_addr == "0.0.0.0":
listen_addr = "127.0.0.1"
listen_port = os.getenv("LISTEN_PORT", "9898")
url = f"http://{listen_addr}:{listen_port}/health"
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError(f"Unsupported URL scheme: {parsed.scheme}")
response = urllib.request.urlopen(url, timeout=4) # nosec B310
print(response.read().decode("utf-8"))
except Exception as e:
print(f"Error: {e}")
sock.close()

69
sidecar/tests/validate.py Normal file
View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""
Quick validation script to test parsing without starting HTTP server.
"""
import sys
sys.path.insert(0, '.')
from rsyslog_exporter import parse_json_impstats, parse_cee_impstats, parse_prometheus_impstats
def test_json_parsing():
"""Test JSON format parsing."""
print("Testing JSON parsing...")
metrics = parse_json_impstats("examples/sample-impstats.json")
print(f" ✓ Parsed {len(metrics)} metrics from JSON")
# Verify some expected metrics
metric_names = [m.name for m in metrics]
assert any("rsyslog_core_utime" in n for n in metric_names), "Missing core metrics"
assert any("rsyslog_core_action_processed" in n for n in metric_names), "Missing action metrics"
print(" ✓ Found expected metric types")
return True
def test_cee_parsing():
"""Test CEE format parsing."""
print("Testing CEE parsing...")
metrics = parse_cee_impstats("examples/sample-impstats.cee")
print(f" ✓ Parsed {len(metrics)} metrics from CEE")
assert len(metrics) > 0, "No metrics parsed from CEE"
return True
def test_prometheus_parsing():
"""Test Prometheus native format parsing."""
print("Testing Prometheus parsing...")
metrics = parse_prometheus_impstats("examples/sample-impstats.prom")
print(f" ✓ Parsed {len(metrics)} metrics from Prometheus format")
# Check that we got the expected metrics
metric_names = [m.name for m in metrics]
assert "rsyslog_global_utime" in metric_names, "Missing global utime"
assert "rsyslog_action_processed" in metric_names, "Missing action processed"
print(" ✓ Found expected metrics")
return True
def main():
print("=" * 60)
print("rsyslog Prometheus Exporter - Parser Validation")
print("=" * 60)
print()
try:
test_json_parsing()
print()
test_cee_parsing()
print()
test_prometheus_parsing()
print()
print("=" * 60)
print("All parser tests passed! ✓")
print("=" * 60)
return 0
except Exception as e:
print()
print(f"✗ Test failed: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())