rsyslog/sidecar/tests/test_udp.py
Andre Lorbach eb7b57ed1d sidecar: add rsyslog impstats exporter
Make a production-ready sidecar exporter to simplify operations and
support consistent deployments across hosts and containers.
Before: no sidecar tooling shipped. After: exporter, scripts, and docs.
Impact: new sidecar defaults to udp on loopback; tests add venv runners.

Implement JSON/CEE/Prometheus parsing with counter/gauge heuristics.
Add UDP burst buffering, size limits, and optional source filtering.
Expose /metrics and /health with parse and drop status reporting.
Provide Dockerfile, docker-compose examples, and systemd install flow.
Add validation and UDP test runners plus sample impstats data.
Document production setup, security posture, and file growth caveats.

With the help of AI-Agents: GitHub Copilot

See also: https://github.com/rsyslog/rsyslog/issues/5824
2026-01-23 07:54:56 +00:00

44 lines
1.3 KiB
Python

#!/usr/bin/env python3
"""
Test UDP mode by sending sample impstats via UDP.
"""
import os
import socket
import time
import urllib.request
from urllib.parse import urlparse
script_dir = os.path.dirname(os.path.abspath(__file__))
sidecar_dir = os.path.dirname(script_dir)
with open(os.path.join(sidecar_dir, "examples/sample-impstats.json"), "r") as f:
lines = f.readlines()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_addr = os.getenv("IMPSTATS_UDP_ADDR", "127.0.0.1")
udp_port = int(os.getenv("IMPSTATS_UDP_PORT", "19090"))
print(f"Sending {len(lines)} lines via UDP to {udp_addr}:{udp_port}...")
message = "".join(lines)
sock.sendto(message.encode("utf-8"), (udp_addr, udp_port))
print("Sent! Waiting 6 seconds for burst completion timeout...")
time.sleep(6)
print("Fetching metrics...")
try:
listen_addr = os.getenv("LISTEN_ADDR", "127.0.0.1")
if listen_addr == "0.0.0.0":
listen_addr = "127.0.0.1"
listen_port = os.getenv("LISTEN_PORT", "9898")
url = f"http://{listen_addr}:{listen_port}/health"
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError(f"Unsupported URL scheme: {parsed.scheme}")
response = urllib.request.urlopen(url, timeout=4) # nosec B310
print(response.read().decode("utf-8"))
except Exception as e:
print(f"Error: {e}")
sock.close()