rsyslog/tests/snmptrapreceiverv2.py
Rainer Gerhards 5d936dab4c tests: tighten snmptrapreceiverv2 error handling
Why

The autofix branch corrected several issues in the SNMP trap receiver
helper, but the combined change should read as one coherent fix set.
The final shutdown path also needed to avoid empty exception handling.

Impact: port parsing is validated, comment typos stay fixed, and
shutdown cleanup is now explicit and centralized.

Before/After: several narrow autofix commits are now one change that
hardens argument handling and simplifies shutdown cleanup.

Technical Overview

Keep the integer validation added for the SNMP port argument.
Retain the existing comment spelling fixes in the helper script.
Move marker-file removal into a shared cleanup helper.
Handle missing .started files explicitly instead of swallowing the
FileNotFoundError silently.
Run marker cleanup and dispatcher shutdown from a single finally block.
Keep dispatcher shutdown and exception propagation behavior unchanged.
Validation was limited to python3 -m py_compile for this helper script.

With the help of AI-Agents: Codex
2026-04-13 12:57:32 +02:00

159 lines
5.6 KiB
Python

# call this via "python[3] script name"
import sys
import asyncio
import os
# Add debug output for startup issues
print("SNMP Trap Receiver v2 starting...", file=sys.stderr)
print(f"Python version: {sys.version}", file=sys.stderr)
print(f"Arguments: {sys.argv}", file=sys.stderr)
# Ensure asyncio event loop is properly set up for Python 3.11+
if sys.version_info >= (3, 11):
try:
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
print("Set asyncio event loop policy for Python 3.11+", file=sys.stderr)
except Exception as e:
print(f"Warning: Could not set asyncio event loop policy: {e}", file=sys.stderr)
try:
from pysnmp.entity import engine, config
from pysnmp.carrier.asyncio.dgram import udp # Changed from asyncore to asyncio
from pysnmp.entity.rfc3413 import ntfrcv
from pysnmp.smi import builder, view, compiler, rfc1902
from pyasn1.type.univ import OctetString
print("All pysnmp imports successful", file=sys.stderr)
except ImportError as e:
print(f"Import error: {e}", file=sys.stderr)
sys.exit(1)
# Global variables
snmpport = 10162
snmpip = "127.0.0.1"
szOutputfile = "snmp.out"
szSnmpLogfile = "snmp_server.log"
# For verbose output
bDebug = False
# Read command line params
if len(sys.argv) > 1:
try:
snmpport = int(sys.argv[1])
except ValueError:
print(
f"Invalid port '{sys.argv[1]}'. Expected an integer, e.g. 10162.",
file=sys.stderr
)
sys.exit(1)
if len(sys.argv) > 2:
snmpip = sys.argv[2]
if len(sys.argv) > 3:
szOutputfile = sys.argv[3]
if len(sys.argv) > 4:
szSnmpLogfile = sys.argv[4]
# Create output files
print(f"Creating output files: {szOutputfile}, {szSnmpLogfile}", file=sys.stderr)
outputFile = open(szOutputfile,"w+")
logFile = open(szSnmpLogfile,"a+")
print("Output files created successfully", file=sys.stderr)
# Assemble MIB viewer
mibBuilder = builder.MibBuilder()
compiler.addMibCompiler(mibBuilder, sources=['file:///usr/share/snmp/mibs', 'file:///var/lib/snmp/mibs', '/usr/local/share/snmp/mibs/'])
mibViewController = view.MibViewController(mibBuilder)
# Pre-load MIB modules we expect to work with
try:
mibBuilder.loadModules('SNMPv2-MIB', 'SNMP-COMMUNITY-MIB', 'SYSLOG-MSG-MIB')
except Exception:
print("Failed loading MIBs")
# Create SNMP engine with autogenerated engineID and pre-bound to socket transport dispatcher
snmpEngine = engine.SnmpEngine()
# Transport setup
# UDP over IPv4, add listening interface/port
print(f"Setting up transport for {snmpip}:{snmpport}", file=sys.stderr)
config.addTransport(
snmpEngine,
udp.domainName + (1,),
udp.UdpTransport().openServerMode((snmpip, snmpport))
)
print("Transport setup completed", file=sys.stderr)
# SNMPv1/2c setup
# SecurityName <-> CommunityName mapping
config.addV1System(snmpEngine, 'my-area', 'public')
print("Started SNMP Trap Receiver: %s, %s, Output: %s" % (snmpport, snmpip, szOutputfile))
logFile.write("Started SNMP Trap Receiver: %s, %s, Output: %s" % (snmpport, snmpip, szOutputfile))
logFile.flush()
# Add PID file creation after startup message
print("Creating .started file", file=sys.stderr)
with open(szSnmpLogfile + ".started", "w") as f:
f.write(str(os.getpid()))
print("Started file created successfully", file=sys.stderr)
def cleanup_started_file():
try:
os.remove(szSnmpLogfile + ".started")
except FileNotFoundError:
print(
f"Startup marker {szSnmpLogfile}.started was already absent during shutdown",
file=sys.stderr
)
# Callback function for receiving notifications
# noinspection PyUnusedLocal,PyUnusedLocal,PyUnusedLocal
def cbReceiverSnmp(snmpEngine, stateReference, contextEngineId, contextName, varBinds, cbCtx):
# Change getTransportInfo to get_transport_info
transportDomain, transportAddress = snmpEngine.msgAndPduDsp.get_transport_info(stateReference)
if (bDebug):
szDebug = str("Notification From: %s, Domain: %s, SNMP Engine: %s, Context: %s" %
(transportAddress, transportDomain, contextEngineId.prettyPrint(), contextName.prettyPrint()))
print(szDebug)
logFile.write(szDebug)
logFile.flush()
# Create output String
szOut = "Trap Source{}, Trap OID {}".format(transportAddress, transportDomain)
varBinds = [rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]), x[1]).resolveWithMib(mibViewController) for x in varBinds]
for name, val in varBinds:
# Append to output String
szOut = szOut + ", Oid: {}, Value: {}".format(name.prettyPrint(), val.prettyPrint())
if isinstance(val, OctetString):
if (name.prettyPrint() != "SNMP-COMMUNITY-MIB::snmpTrapAddress.0"):
szOctets = val.asOctets()#.rstrip('\r').rstrip('\n')
szOut = szOut + ", Octets: {}".format(szOctets)
if (bDebug):
print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))
outputFile.write(szOut)
if "\n" not in szOut:
outputFile.write("\n")
outputFile.flush()
# Register SNMP Application at the SNMP engine
print("Registering notification receiver", file=sys.stderr)
ntfrcv.NotificationReceiver(snmpEngine, cbReceiverSnmp)
# Run I/O dispatcher which would receive queries and send confirmations
print("Starting transport dispatcher", file=sys.stderr)
try:
snmpEngine.transportDispatcher.runDispatcher()
except KeyboardInterrupt:
print("Received keyboard interrupt, shutting down", file=sys.stderr)
except Exception as e:
print(f"Exception in dispatcher: {e}", file=sys.stderr)
raise
finally:
cleanup_started_file()
snmpEngine.transportDispatcher.closeDispatcher()