mirror of
https://github.com/rsyslog/rsyslog.git
synced 2026-04-23 13:48:12 +02:00
Refactor: rename omotlp module to omotel
Rename the OpenTelemetry output module from "omotlp" to "omotel" across the entire codebase. This includes directory, file, function, type, and constant names, as well as build system configuration, documentation, and test files. Changes: - Directory: plugins/omotlp/ → plugins/omotel/ - Source files: omotlp.c → omotel.c, omotlp_http.c → omotel_http.c, omotlp_http.h → omotel_http.h - Code: all function names, types, constants (OMOTLP_* → OMOTEL_*) - Build: configure.ac (--enable-omotlp → --enable-omotel, OMOTLP_HTTP_* → OMOTEL_HTTP_*), Makefile.am files - Docs: omotlp.rst → omotel.rst, all examples updated - Tests: omotlp-*.sh → omotel-*.sh, content updated - Tasks: omotlp_*.md → omotel_*.md - Config: module_map.yaml, AGENTS.md Impact: - Module name in rsyslog.conf: "omotlp" → "omotel" - Build flag: --enable-omotlp → --enable-omotel - No functional changes, pure refactoring closes: https://github.com/rsyslog/rsyslog/issues/6361
This commit is contained in:
parent
bf57b7321c
commit
c9f56709a7
6
.github/workflows/run_checks.yml
vendored
6
.github/workflows/run_checks.yml
vendored
@ -145,7 +145,7 @@ jobs:
|
||||
# It is better to run at least the majority of checks than to postpone that
|
||||
# any longer. 2025-01-31 RGerhards
|
||||
export RSYSLOG_CONFIGURE_OPTIONS_EXTRA="--enable-omazureeventhubs --enable-imdtls \
|
||||
--enable-omdtls --enable-omotlp --disable-omamqp1 --disable-snmp --disable-kafka-tests \
|
||||
--enable-omdtls --enable-omotel --disable-omamqp1 --disable-snmp --disable-kafka-tests \
|
||||
--disable-elasticsearch-tests --enable-mmsnareparse"
|
||||
;;
|
||||
'ubuntu_22_distcheck')
|
||||
@ -161,7 +161,7 @@ jobs:
|
||||
export CC='clang'
|
||||
export RSYSLOG_CONFIGURE_OPTIONS_EXTRA="--disable-elasticsearch-tests \
|
||||
--disable-libfaketime --without-valgrind-testbench --disable-valgrind \
|
||||
--enable-omotlp \
|
||||
--enable-omotel \
|
||||
--disable-kafka-tests --enable-imdtls --enable-omdtls \
|
||||
--enable-mmsnareparse"
|
||||
export CFLAGS="-fstack-protector -D_FORTIFY_SOURCE=2 \
|
||||
@ -183,7 +183,7 @@ jobs:
|
||||
# imhttp disabled because of race in civetweb (need to consider different lib)
|
||||
export RSYSLOG_CONFIGURE_OPTIONS_EXTRA="--disable-elasticsearch-tests --enable-imfile-tests \
|
||||
--disable-impstats --disable-kafka-tests --disable-mmpstrucdata \
|
||||
--enable-omotlp \
|
||||
--enable-omotel \
|
||||
--disable-clickhouse --disable-clickhouse-tests --disable-kafka-tests \
|
||||
--disable-libfaketime --disable-imhttp \
|
||||
--without-valgrind-testbench --disable-valgrind \
|
||||
|
||||
@ -109,7 +109,7 @@ When the user says the codeword "BUILD" optionally followed by configure options
|
||||
Examples:
|
||||
- `BUILD` - Uses default testbench configuration
|
||||
- `BUILD --enable-testbench --enable-mmsnareparse` - Custom configuration with mmsnareparse module
|
||||
- `BUILD --enable-testbench --enable-imdiag --enable-omstdout --enable-mmsnareparse --enable-omotlp` - Multiple modules
|
||||
- `BUILD --enable-testbench --enable-imdiag --enable-omstdout --enable-mmsnareparse --enable-omotel` - Multiple modules
|
||||
|
||||
### `TEST [test-script-names]`
|
||||
|
||||
@ -397,7 +397,7 @@ touch /tmp/rsyslog_base_env.flag
|
||||
# For multiple modules:
|
||||
./configure --enable-testbench --enable-imdiag --enable-omstdout \
|
||||
--enable-mmsnareparse \
|
||||
--enable-omotlp \
|
||||
--enable-omotel \
|
||||
--enable-imhttp
|
||||
```
|
||||
|
||||
|
||||
@ -1214,8 +1214,8 @@ if ENABLE_OMHDFS
|
||||
SUBDIRS += plugins/omhdfs
|
||||
endif
|
||||
|
||||
if ENABLE_OMOTLP
|
||||
SUBDIRS += plugins/omotlp
|
||||
if ENABLE_OMOTEL
|
||||
SUBDIRS += plugins/omotel
|
||||
endif
|
||||
|
||||
if ENABLE_OMJOURNAL
|
||||
|
||||
30
configure.ac
30
configure.ac
@ -2565,23 +2565,23 @@ if test "x$enable_omhdfs"; then
|
||||
fi
|
||||
AM_CONDITIONAL(ENABLE_OMHDFS, test x$enable_omhdfs = xyes)
|
||||
|
||||
AC_ARG_ENABLE(omotlp,
|
||||
[AS_HELP_STRING([--enable-omotlp],[Enable OpenTelemetry output module @<:@default=no@:>@])],
|
||||
AC_ARG_ENABLE(omotel,
|
||||
[AS_HELP_STRING([--enable-omotel],[Enable OpenTelemetry output module @<:@default=no@:>@])],
|
||||
[case "${enableval}" in
|
||||
yes) enable_omotlp="yes" ;;
|
||||
no) enable_omotlp="no" ;;
|
||||
*) AC_MSG_ERROR(bad value ${enableval} for --enable-omotlp) ;;
|
||||
yes) enable_omotel="yes" ;;
|
||||
no) enable_omotel="no" ;;
|
||||
*) AC_MSG_ERROR(bad value ${enableval} for --enable-omotel) ;;
|
||||
esac],
|
||||
[enable_omotlp=no]
|
||||
[enable_omotel=no]
|
||||
)
|
||||
OMOTLP_HTTP_CFLAGS=""
|
||||
OMOTLP_HTTP_LIBS=""
|
||||
if test "x$enable_omotlp" = "xyes"; then
|
||||
PKG_CHECK_MODULES([OMOTLP_HTTP], [libcurl libfastjson])
|
||||
OMOTEL_HTTP_CFLAGS=""
|
||||
OMOTEL_HTTP_LIBS=""
|
||||
if test "x$enable_omotel" = "xyes"; then
|
||||
PKG_CHECK_MODULES([OMOTEL_HTTP], [libcurl libfastjson])
|
||||
fi
|
||||
AC_SUBST([OMOTLP_HTTP_CFLAGS])
|
||||
AC_SUBST([OMOTLP_HTTP_LIBS])
|
||||
AM_CONDITIONAL(ENABLE_OMOTLP, test x$enable_omotlp = xyes)
|
||||
AC_SUBST([OMOTEL_HTTP_CFLAGS])
|
||||
AC_SUBST([OMOTEL_HTTP_LIBS])
|
||||
AM_CONDITIONAL(ENABLE_OMOTEL, test x$enable_omotel = xyes)
|
||||
|
||||
# support for kafka input output
|
||||
AC_ARG_ENABLE(omkafka,
|
||||
@ -3209,7 +3209,7 @@ AC_CONFIG_FILES([Makefile \
|
||||
plugins/immark/Makefile \
|
||||
plugins/imklog/Makefile \
|
||||
plugins/omhdfs/Makefile \
|
||||
plugins/omotlp/Makefile \
|
||||
plugins/omotel/Makefile \
|
||||
plugins/omkafka/Makefile \
|
||||
plugins/omprog/Makefile \
|
||||
plugins/mmexternal/Makefile \
|
||||
@ -3358,7 +3358,7 @@ echo " omstdout module will be compiled: $enable_omstdout"
|
||||
echo " omsendertrack module will be compiled: $enable_omsendertrack"
|
||||
echo " omjournal module will be compiled: $enable_omjournal"
|
||||
echo " omhdfs module will be compiled: $enable_omhdfs"
|
||||
echo " omotlp module will be compiled: $enable_omotlp"
|
||||
echo " omotel module will be compiled: $enable_omotel"
|
||||
echo " omelasticsearch module will be compiled: $enable_elasticsearch"
|
||||
echo " omclickhouse module will be compiled: $enable_clickhouse"
|
||||
echo " omhttp module will be compiled: $enable_omhttp"
|
||||
|
||||
@ -60,8 +60,8 @@ mmjsonrewrite:
|
||||
notes:
|
||||
- Expands dotted JSON property names into nested containers per message.
|
||||
|
||||
omotlp:
|
||||
paths: ["plugins/omotlp/"]
|
||||
omotel:
|
||||
paths: ["plugins/omotel/"]
|
||||
requires_serialization: false
|
||||
notes:
|
||||
- Transport resources are not yet active; introduce pData locking once they appear.
|
||||
|
||||
@ -1,19 +1,19 @@
|
||||
.. _module-omotlp:
|
||||
.. _module-omotel:
|
||||
|
||||
.. meta::
|
||||
:description: Scaffolding for the omotlp OpenTelemetry output module.
|
||||
:keywords: omotlp, otlp, opentelemetry, rsyslog module
|
||||
:description: Scaffolding for the omotel OpenTelemetry output module.
|
||||
:keywords: omotel, otlp, opentelemetry, rsyslog module
|
||||
|
||||
.. summary-start
|
||||
|
||||
Phase 1 of the omotlp output plugin streams OpenTelemetry logs over
|
||||
Phase 1 of the omotel output plugin streams OpenTelemetry logs over
|
||||
OTLP/HTTP JSON with configurable batching, gzip compression, retry/backoff
|
||||
controls, TLS/mTLS support for secure HTTPS connections, and HTTP proxy
|
||||
support for corporate networks and firewalled environments.
|
||||
|
||||
.. summary-end
|
||||
|
||||
omotlp: OpenTelemetry output module (preview)
|
||||
omotel: OpenTelemetry output module (preview)
|
||||
=============================================
|
||||
|
||||
.. warning::
|
||||
@ -25,7 +25,7 @@ omotlp: OpenTelemetry output module (preview)
|
||||
Overview
|
||||
--------
|
||||
|
||||
``omotlp`` prepares rsyslog for native :abbr:`OTLP (OpenTelemetry Log Protocol)`
|
||||
``omotel`` prepares rsyslog for native :abbr:`OTLP (OpenTelemetry Log Protocol)`
|
||||
exports. Phase 1 focuses on the OTLP/HTTP JSON transport path: the module maps
|
||||
rsyslog metadata into the canonical OTLP JSON structure, joins the configured
|
||||
endpoint and path, and posts batches of rendered payloads via ``libcurl`` using
|
||||
@ -38,8 +38,8 @@ Availability
|
||||
------------
|
||||
|
||||
The module is built only when ``./configure`` is invoked with
|
||||
``--enable-omotlp=yes`` and both ``libcurl`` and ``libfastjson`` are present.
|
||||
The default ``--enable-omotlp`` setting is ``no``, so you must opt in
|
||||
``--enable-omotel=yes`` and both ``libcurl`` and ``libfastjson`` are present.
|
||||
The default ``--enable-omotel`` setting is ``no``, so you must opt in
|
||||
explicitly. The HTTP transport depends on ``libcurl`` at runtime.
|
||||
|
||||
Configuration
|
||||
@ -49,7 +49,7 @@ The action parameters listed below mirror the current transport design. All
|
||||
parameters are optional and fall back to sensible defaults inspired by the
|
||||
`OTLP specification <https://opentelemetry.io/docs/specs/otlp/>`_.
|
||||
|
||||
.. csv-table:: ``omotlp`` action parameters
|
||||
.. csv-table:: ``omotel`` action parameters
|
||||
:header: "Parameter", "Type", "Default", "Description"
|
||||
:widths: auto
|
||||
|
||||
@ -120,9 +120,9 @@ other non-success responses discard the message and log an error.
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
module(load="omotlp")
|
||||
module(load="omotel")
|
||||
action(
|
||||
type="omotlp"
|
||||
type="omotel"
|
||||
endpoint="https://otel-collector:4318"
|
||||
path="/v1/logs"
|
||||
protocol="http/json"
|
||||
@ -152,20 +152,20 @@ Trace Correlation Example
|
||||
|
||||
The following example demonstrates how to extract trace context from JSON messages
|
||||
and populate OTLP trace correlation fields. The trace context is extracted from
|
||||
the message using ``mmjsonparse`` and then exported via ``omotlp`` with trace
|
||||
the message using ``mmjsonparse`` and then exported via ``omotel`` with trace
|
||||
correlation enabled.
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
module(load="mmjsonparse")
|
||||
module(load="omotlp")
|
||||
module(load="omotel")
|
||||
|
||||
# Parse JSON messages to extract trace properties
|
||||
action(type="mmjsonparse" mode="find-json")
|
||||
|
||||
# Export with trace correlation
|
||||
action(
|
||||
type="omotlp"
|
||||
type="omotel"
|
||||
endpoint="https://otel-collector:4318"
|
||||
path="/v1/logs"
|
||||
trace_id.property="trace_id"
|
||||
@ -187,9 +187,9 @@ resource semantic conventions compliance.
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
module(load="omotlp")
|
||||
module(load="omotel")
|
||||
action(
|
||||
type="omotlp"
|
||||
type="omotel"
|
||||
endpoint="https://otel-collector:4318"
|
||||
path="/v1/logs"
|
||||
resource='{
|
||||
@ -220,9 +220,9 @@ bundle and optional mutual TLS authentication with client certificates.
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
module(load="omotlp")
|
||||
module(load="omotel")
|
||||
action(
|
||||
type="omotlp"
|
||||
type="omotel"
|
||||
endpoint="https://otel-collector:4318"
|
||||
path="/v1/logs"
|
||||
tls.cacert="/etc/ssl/certs/ca-bundle.pem"
|
||||
@ -273,14 +273,14 @@ Proxy Configuration Example
|
||||
|
||||
The following example demonstrates how to configure HTTP proxy support for
|
||||
corporate networks or environments that require traffic to pass through a proxy
|
||||
server. Proxy support enables omotlp to work through firewalls, network gateways,
|
||||
server. Proxy support enables omotel to work through firewalls, network gateways,
|
||||
and corporate proxies.
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
module(load="omotlp")
|
||||
module(load="omotel")
|
||||
action(
|
||||
type="omotlp"
|
||||
type="omotel"
|
||||
endpoint="https://otel-collector:4318"
|
||||
path="/v1/logs"
|
||||
proxy="http://proxy.example.com:8080"
|
||||
@ -290,9 +290,9 @@ For proxies that require authentication, provide both username and password:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
module(load="omotlp")
|
||||
module(load="omotel")
|
||||
action(
|
||||
type="omotlp"
|
||||
type="omotel"
|
||||
endpoint="https://otel-collector:4318"
|
||||
path="/v1/logs"
|
||||
proxy="http://proxy.example.com:8080"
|
||||
@ -331,7 +331,7 @@ header for each HTTP request.
|
||||
When a proxy is configured, all HTTP requests to the OTLP collector endpoint
|
||||
are routed through the proxy server. The proxy acts as an intermediary,
|
||||
forwarding requests to the target collector and returning responses. This
|
||||
enables omotlp to work in environments where:
|
||||
enables omotel to work in environments where:
|
||||
|
||||
- Direct connections to the collector are blocked by firewall rules
|
||||
- Network traffic must pass through a corporate proxy gateway
|
||||
@ -375,7 +375,7 @@ log data model. Each batch wraps log records in the following hierarchy:
|
||||
:widths: auto
|
||||
|
||||
"``service.name``", "``rsyslog``"
|
||||
"``telemetry.sdk.name``", "``rsyslog-omotlp``"
|
||||
"``telemetry.sdk.name``", "``rsyslog-omotel``"
|
||||
"``telemetry.sdk.language``", "``C``"
|
||||
"``telemetry.sdk.version``", "rsyslog version string"
|
||||
"``host.name``", "hostname (only if uniform across all records in batch)"
|
||||
@ -386,7 +386,7 @@ log data model. Each batch wraps log records in the following hierarchy:
|
||||
:header: "Field", "Value"
|
||||
:widths: auto
|
||||
|
||||
"``scope.name``", "``rsyslog.omotlp``"
|
||||
"``scope.name``", "``rsyslog.omotel``"
|
||||
"``scope.version``", "rsyslog version string"
|
||||
|
||||
**Per-record attributes** (derived from syslog metadata):
|
||||
@ -448,9 +448,9 @@ mapping to match collector expectations or custom severity schemes:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
module(load="omotlp")
|
||||
module(load="omotel")
|
||||
action(
|
||||
type="omotlp"
|
||||
type="omotel"
|
||||
endpoint="https://otel-collector:4318"
|
||||
path="/v1/logs"
|
||||
attributeMap='{
|
||||
@ -486,8 +486,8 @@ Statistic Counter
|
||||
=================
|
||||
|
||||
This plugin maintains :doc:`statistics <../rsyslog_statistic_counter>` for each
|
||||
worker instance. The statistic origin is named "omotlp" with the instance URL
|
||||
appended (e.g., "omotlp-http://127.0.0.1:4318/v1/logs"). Statistics are visible
|
||||
worker instance. The statistic origin is named "omotel" with the instance URL
|
||||
appended (e.g., "omotel-http://127.0.0.1:4318/v1/logs"). Statistics are visible
|
||||
via the :doc:`impstats <../modules/impstats>` module when enabled.
|
||||
|
||||
The following counters are tracked per worker instance:
|
||||
@ -519,4 +519,4 @@ The following counters are tracked per worker instance:
|
||||
|
||||
Counters are thread-safe and use atomic operations for updates. Each worker
|
||||
instance maintains its own statistics object, allowing operators to monitor
|
||||
performance per action instance when multiple omotlp actions are configured.
|
||||
performance per action instance when multiple omotel actions are configured.
|
||||
@ -1,14 +1,14 @@
|
||||
# AGENTS.md – omotlp output module
|
||||
# AGENTS.md – omotel output module
|
||||
|
||||
These instructions apply to files inside `plugins/omotlp/`.
|
||||
These instructions apply to files inside `plugins/omotel/`.
|
||||
|
||||
## Development notes
|
||||
- Keep the module pure C unless the optional gRPC shim is enabled.
|
||||
- Update `MODULE_METADATA.yaml` and the user documentation when adding new
|
||||
configuration parameters or behavioral changes.
|
||||
- Refresh the concurrency note in `omotlp.c` if locking expectations change.
|
||||
- Refresh the concurrency note in `omotel.c` if locking expectations change.
|
||||
- Run `devtools/format-code.sh` before committing.
|
||||
|
||||
## Testing
|
||||
- Run `tests/omotlp-http-batch.sh` to exercise the HTTP batching, gzip, and
|
||||
- Run `tests/omotel-http-batch.sh` to exercise the HTTP batching, gzip, and
|
||||
retry path.
|
||||
@ -1,25 +1,25 @@
|
||||
pkglib_LTLIBRARIES = omotlp.la
|
||||
pkglib_LTLIBRARIES = omotel.la
|
||||
|
||||
omotlp_la_SOURCES = \
|
||||
omotlp.c \
|
||||
omotel_la_SOURCES = \
|
||||
omotel.c \
|
||||
otlp_json.c \
|
||||
omotlp_http.c
|
||||
omotel_http.c
|
||||
|
||||
noinst_HEADERS = \
|
||||
otlp_json.h \
|
||||
omotlp_http.h
|
||||
omotel_http.h
|
||||
|
||||
omotlp_la_CPPFLAGS = \
|
||||
omotel_la_CPPFLAGS = \
|
||||
-I$(srcdir) \
|
||||
-I$(top_srcdir) \
|
||||
-I$(top_srcdir)/runtime \
|
||||
-I$(top_srcdir)/grammar \
|
||||
$(RSRT_CFLAGS) \
|
||||
$(OMOTLP_HTTP_CFLAGS)
|
||||
$(OMOTEL_HTTP_CFLAGS)
|
||||
|
||||
omotlp_la_LDFLAGS = -module -avoid-version
|
||||
omotel_la_LDFLAGS = -module -avoid-version
|
||||
|
||||
omotlp_la_LIBADD = $(OMOTLP_HTTP_LIBS) $(ZLIB_LIBS)
|
||||
omotel_la_LIBADD = $(OMOTEL_HTTP_LIBS) $(ZLIB_LIBS)
|
||||
|
||||
EXTRA_DIST = \
|
||||
AGENTS.md \
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* @file omotlp.c
|
||||
* @file omotel.c
|
||||
* @brief OpenTelemetry (OTLP) output module for rsyslog
|
||||
*
|
||||
* This module provides native OpenTelemetry log export capabilities using
|
||||
@ -58,22 +58,22 @@
|
||||
#include "statsobj.h"
|
||||
|
||||
#include "otlp_json.h"
|
||||
#include "omotlp_http.h"
|
||||
#include "omotel_http.h"
|
||||
|
||||
MODULE_TYPE_OUTPUT;
|
||||
MODULE_TYPE_NOKEEP;
|
||||
MODULE_CNFNAME("omotlp")
|
||||
MODULE_CNFNAME("omotel")
|
||||
|
||||
DEF_OMOD_STATIC_DATA;
|
||||
DEFobjCurrIf(datetime);
|
||||
DEFobjCurrIf(prop);
|
||||
DEFobjCurrIf(statsobj);
|
||||
|
||||
typedef enum omotlp_compression_e {
|
||||
OMOTLP_COMPRESSION_UNSET = 0,
|
||||
OMOTLP_COMPRESSION_NONE,
|
||||
OMOTLP_COMPRESSION_GZIP,
|
||||
} omotlp_compression_t;
|
||||
typedef enum omotel_compression_e {
|
||||
OMOTEL_COMPRESSION_UNSET = 0,
|
||||
OMOTEL_COMPRESSION_NONE,
|
||||
OMOTEL_COMPRESSION_GZIP,
|
||||
} omotel_compression_t;
|
||||
|
||||
typedef struct header_list_s {
|
||||
char **values;
|
||||
@ -104,8 +104,8 @@ typedef struct severity_map_s {
|
||||
} severity_map_t;
|
||||
|
||||
enum {
|
||||
OMOTLP_OMSR_IDX_MESSAGE = 0,
|
||||
OMOTLP_OMSR_IDX_BODY = 1,
|
||||
OMOTEL_OMSR_IDX_MESSAGE = 0,
|
||||
OMOTEL_OMSR_IDX_BODY = 1,
|
||||
};
|
||||
|
||||
typedef struct _instanceData {
|
||||
@ -122,7 +122,7 @@ typedef struct _instanceData {
|
||||
long retryMaxMs;
|
||||
unsigned int retryMaxRetries;
|
||||
unsigned int retryJitterPercent;
|
||||
omotlp_compression_t compression_mode;
|
||||
omotel_compression_t compression_mode;
|
||||
int compressionConfigured;
|
||||
int headersConfigured;
|
||||
int bearerConfigured;
|
||||
@ -154,8 +154,8 @@ typedef struct _instanceData {
|
||||
uchar *proxyPassword;
|
||||
} instanceData;
|
||||
|
||||
typedef struct omotlp_batch_entry_s {
|
||||
omotlp_log_record_t record;
|
||||
typedef struct omotel_batch_entry_s {
|
||||
omotel_log_record_t record;
|
||||
char *body;
|
||||
char *hostname;
|
||||
char *app_name;
|
||||
@ -163,20 +163,20 @@ typedef struct omotlp_batch_entry_s {
|
||||
char *msg_id;
|
||||
char *trace_id; /* Allocated string for trace_id */
|
||||
char *span_id; /* Allocated string for span_id */
|
||||
} omotlp_batch_entry_t;
|
||||
} omotel_batch_entry_t;
|
||||
|
||||
typedef struct omotlp_batch_state_s {
|
||||
omotlp_batch_entry_t *entries;
|
||||
typedef struct omotel_batch_state_s {
|
||||
omotel_batch_entry_t *entries;
|
||||
size_t count;
|
||||
size_t capacity;
|
||||
size_t estimated_bytes;
|
||||
long long first_enqueue_ms;
|
||||
} omotlp_batch_state_t;
|
||||
} omotel_batch_state_t;
|
||||
|
||||
typedef struct wrkrInstanceData {
|
||||
instanceData *pData;
|
||||
omotlp_http_client_t *http_client;
|
||||
omotlp_batch_state_t batch;
|
||||
omotel_http_client_t *http_client;
|
||||
omotel_batch_state_t batch;
|
||||
pthread_t flush_thread;
|
||||
pthread_mutex_t batch_mutex;
|
||||
int flush_thread_running;
|
||||
@ -324,7 +324,7 @@ static rsRetVal applyEnvDefaults(instanceData *pData) {
|
||||
}
|
||||
}
|
||||
|
||||
if (!pData->compressionConfigured && pData->compression_mode == OMOTLP_COMPRESSION_UNSET) {
|
||||
if (!pData->compressionConfigured && pData->compression_mode == OMOTEL_COMPRESSION_UNSET) {
|
||||
value = firstPopulatedEnv(compressionEnvVars);
|
||||
if (value != NULL) {
|
||||
CHKiRet(set_compression_mode(pData, value));
|
||||
@ -412,7 +412,7 @@ static rsRetVal validateProtocol(instanceData *pData) {
|
||||
goto finalize_it;
|
||||
}
|
||||
|
||||
LogError(0, RS_RET_NOT_IMPLEMENTED, "omotlp: protocol '%s' is not supported by the scaffolding build",
|
||||
LogError(0, RS_RET_NOT_IMPLEMENTED, "omotel: protocol '%s' is not supported by the scaffolding build",
|
||||
pData->protocol);
|
||||
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED);
|
||||
|
||||
@ -495,17 +495,17 @@ static const severity_mapping_t severity_lookup[8] = {{24u, "EMERGENCY"}, {23u,
|
||||
{17u, "ERROR"}, {13u, "WARNING"}, {11u, "NOTICE"},
|
||||
{9u, "INFO"}, {5u, "DEBUG"}};
|
||||
|
||||
#define OMOTLP_DEFAULT_BATCH_MAX_ITEMS 512u
|
||||
#define OMOTLP_DEFAULT_BATCH_MAX_BYTES (512u * 1024u)
|
||||
#define OMOTLP_DEFAULT_BATCH_TIMEOUT_MS 5000L
|
||||
#define OMOTLP_DEFAULT_RETRY_INITIAL_MS 1000L
|
||||
#define OMOTLP_DEFAULT_RETRY_MAX_MS 30000L
|
||||
#define OMOTLP_DEFAULT_RETRY_MAX_RETRIES 5u
|
||||
#define OMOTLP_DEFAULT_RETRY_JITTER_PERCENT 20u
|
||||
#define OMOTEL_DEFAULT_BATCH_MAX_ITEMS 512u
|
||||
#define OMOTEL_DEFAULT_BATCH_MAX_BYTES (512u * 1024u)
|
||||
#define OMOTEL_DEFAULT_BATCH_TIMEOUT_MS 5000L
|
||||
#define OMOTEL_DEFAULT_RETRY_INITIAL_MS 1000L
|
||||
#define OMOTEL_DEFAULT_RETRY_MAX_MS 30000L
|
||||
#define OMOTEL_DEFAULT_RETRY_MAX_RETRIES 5u
|
||||
#define OMOTEL_DEFAULT_RETRY_JITTER_PERCENT 20u
|
||||
|
||||
#define OMOTLP_BATCH_BASE_OVERHEAD 256u
|
||||
#define OMOTLP_BATCH_RECORD_OVERHEAD 256u
|
||||
#define OMOTLP_IDLE_FLUSH_INTERVAL_MS 1000L
|
||||
#define OMOTEL_BATCH_BASE_OVERHEAD 256u
|
||||
#define OMOTEL_BATCH_RECORD_OVERHEAD 256u
|
||||
#define OMOTEL_IDLE_FLUSH_INTERVAL_MS 1000L
|
||||
|
||||
static void header_list_init(header_list_t *list) {
|
||||
if (list == NULL) {
|
||||
@ -772,12 +772,12 @@ static rsRetVal parse_long_param(const char *name, es_str_t *value, long min, lo
|
||||
errno = 0;
|
||||
parsed = strtol(text, &end, 10);
|
||||
if (errno != 0 || end == text || *end != '\0') {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: invalid numeric value '%s' for parameter %s", text, name);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: invalid numeric value '%s' for parameter %s", text, name);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (parsed < min) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: value %ld for %s is below the minimum %ld", parsed, name, min);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: value %ld for %s is below the minimum %ld", parsed, name, min);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -809,12 +809,12 @@ static rsRetVal parse_size_param(const char *name, es_str_t *value, size_t min,
|
||||
errno = 0;
|
||||
parsed = strtoull(text, &end, 10);
|
||||
if (errno != 0 || end == text || *end != '\0') {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: invalid size value '%s' for parameter %s", text, name);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: invalid size value '%s' for parameter %s", text, name);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (parsed < (unsigned long long)min) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: value %s for %s is below the minimum %zu", text, name, min);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: value %s for %s is below the minimum %zu", text, name, min);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -846,12 +846,12 @@ static rsRetVal parse_uint_param(const char *name, es_str_t *value, unsigned int
|
||||
errno = 0;
|
||||
parsed = strtoul(text, &end, 10);
|
||||
if (errno != 0 || end == text || *end != '\0') {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: invalid numeric value '%s' for parameter %s", text, name);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: invalid numeric value '%s' for parameter %s", text, name);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (parsed < (unsigned long)min) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: value %lu for %s is below the minimum %u", parsed, name, min);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: value %lu for %s is below the minimum %u", parsed, name, min);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -877,12 +877,12 @@ static rsRetVal parse_headers_json(instanceData *pData, const char *text) {
|
||||
|
||||
root = fjson_tokener_parse(text);
|
||||
if (root == NULL) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: failed to parse headers JSON '%s'", text);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: failed to parse headers JSON '%s'", text);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (!fjson_object_is_type(root, fjson_type_object)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: headers parameter must be a JSON object");
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: headers parameter must be a JSON object");
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -895,7 +895,7 @@ static rsRetVal parse_headers_json(instanceData *pData, const char *text) {
|
||||
|
||||
if (value_obj != NULL) {
|
||||
if (!fjson_object_is_type(value_obj, fjson_type_string)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: header '%s' value must be a string", key);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: header '%s' value must be a string", key);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
value = fjson_object_get_string(value_obj);
|
||||
@ -925,12 +925,12 @@ static rsRetVal parse_attribute_map(instanceData *pData, const char *json_text)
|
||||
|
||||
root = fjson_tokener_parse(json_text);
|
||||
if (root == NULL) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: failed to parse attributeMap JSON: %s", json_text);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: failed to parse attributeMap JSON: %s", json_text);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (!fjson_object_is_type(root, fjson_type_object)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: attributeMap must be a JSON object");
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: attributeMap must be a JSON object");
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -943,7 +943,7 @@ static rsRetVal parse_attribute_map(instanceData *pData, const char *json_text)
|
||||
const char *otlp_attr = NULL;
|
||||
|
||||
if (value_obj == NULL || !fjson_object_is_type(value_obj, fjson_type_string)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: attributeMap value for '%s' must be a string", rsyslog_prop);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: attributeMap value for '%s' must be a string", rsyslog_prop);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -979,12 +979,12 @@ static rsRetVal parse_severity_map(instanceData *pData, const char *json_text) {
|
||||
|
||||
root = fjson_tokener_parse(json_text);
|
||||
if (root == NULL) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: failed to parse severity.map JSON: %s", json_text);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: failed to parse severity.map JSON: %s", json_text);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (!fjson_object_is_type(root, fjson_type_object)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: severity.map must be a JSON object");
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: severity.map must be a JSON object");
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -1007,12 +1007,12 @@ static rsRetVal parse_severity_map(instanceData *pData, const char *json_text) {
|
||||
errno = 0;
|
||||
priority = (int)strtol(key, NULL, 10);
|
||||
if (errno != 0 || priority < 0 || priority > 7) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: severity.map key '%s' must be a number 0-7", key);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: severity.map key '%s' must be a number 0-7", key);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (value_obj == NULL || !fjson_object_is_type(value_obj, fjson_type_object)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: severity.map value for priority %d must be an object", priority);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: severity.map value for priority %d must be an object", priority);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -1021,12 +1021,12 @@ static rsRetVal parse_severity_map(instanceData *pData, const char *json_text) {
|
||||
fjson_object_object_get_ex(value_obj, "text", &text_obj);
|
||||
|
||||
if (number_obj == NULL || !fjson_object_is_type(number_obj, fjson_type_int)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: severity.map[%d] must have 'number' field (integer)", priority);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: severity.map[%d] must have 'number' field (integer)", priority);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (text_obj == NULL || !fjson_object_is_type(text_obj, fjson_type_string)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: severity.map[%d] must have 'text' field (string)", priority);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: severity.map[%d] must have 'text' field (string)", priority);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -1093,7 +1093,7 @@ static rsRetVal parse_headers_env(instanceData *pData, const char *text) {
|
||||
|
||||
value = strchr(token, '=');
|
||||
if (value == NULL) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: header entry '%s' is missing '='", token);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: header entry '%s' is missing '='", token);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -1147,17 +1147,17 @@ static rsRetVal parse_timeout_string(const char *text, long *out) {
|
||||
errno = 0;
|
||||
parsed = strtol(number, &end, 10);
|
||||
if (errno != 0 || end == number || *end != '\0') {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: invalid timeout value '%s'", text);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: invalid timeout value '%s'", text);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (parsed < 0) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: timeout must be non-negative, got %ld", parsed);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: timeout must be non-negative, got %ld", parsed);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (multiplier != 1 && parsed > LONG_MAX / multiplier) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: timeout '%s' exceeds range", text);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: timeout '%s' exceeds range", text);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -1183,7 +1183,7 @@ static rsRetVal set_compression_mode(instanceData *pData, const char *value) {
|
||||
|
||||
len = strlen(value);
|
||||
if (len >= sizeof(buffer)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: unsupported compression value '%s'", value);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: unsupported compression value '%s'", value);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -1193,11 +1193,11 @@ static rsRetVal set_compression_mode(instanceData *pData, const char *value) {
|
||||
buffer[len] = '\0';
|
||||
|
||||
if (!strcmp(buffer, "gzip")) {
|
||||
pData->compression_mode = OMOTLP_COMPRESSION_GZIP;
|
||||
pData->compression_mode = OMOTEL_COMPRESSION_GZIP;
|
||||
} else if (!strcmp(buffer, "none")) {
|
||||
pData->compression_mode = OMOTLP_COMPRESSION_NONE;
|
||||
pData->compression_mode = OMOTEL_COMPRESSION_NONE;
|
||||
} else {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: compression '%s' is not supported", value);
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: compression '%s' is not supported", value);
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
@ -1235,7 +1235,7 @@ finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
static void mapSeverity(int syslogSeverity, instanceData *pData, omotlp_log_record_t *record) {
|
||||
static void mapSeverity(int syslogSeverity, instanceData *pData, omotel_log_record_t *record) {
|
||||
if (syslogSeverity < 0 || syslogSeverity > 7) {
|
||||
record->severity_number = 0u;
|
||||
record->severity_text = NULL;
|
||||
@ -1511,7 +1511,7 @@ static char *extract_property_string(smsg_t *msg, const char *prop_name) {
|
||||
* @param[out] record OTLP log record to populate
|
||||
* @return RS_RET_OK on success
|
||||
*/
|
||||
static rsRetVal populateLogRecord(smsg_t *msg, const char *body, instanceData *pData, omotlp_log_record_t *record) {
|
||||
static rsRetVal populateLogRecord(smsg_t *msg, const char *body, instanceData *pData, omotel_log_record_t *record) {
|
||||
int severity;
|
||||
char *trace_id_str = NULL;
|
||||
char *span_id_str = NULL;
|
||||
@ -1543,7 +1543,7 @@ static rsRetVal populateLogRecord(smsg_t *msg, const char *body, instanceData *p
|
||||
record->trace_id = trace_id_str;
|
||||
trace_id_str = NULL; /* Ownership transferred */
|
||||
} else {
|
||||
LogError(0, RS_RET_OK, "omotlp: invalid trace_id format (expected 32 hex chars): %s", trace_id_str);
|
||||
LogError(0, RS_RET_OK, "omotel: invalid trace_id format (expected 32 hex chars): %s", trace_id_str);
|
||||
free(trace_id_str);
|
||||
trace_id_str = NULL;
|
||||
}
|
||||
@ -1555,7 +1555,7 @@ static rsRetVal populateLogRecord(smsg_t *msg, const char *body, instanceData *p
|
||||
record->span_id = span_id_str;
|
||||
span_id_str = NULL; /* Ownership transferred */
|
||||
} else {
|
||||
LogError(0, RS_RET_OK, "omotlp: invalid span_id format (expected 16 hex chars): %s", span_id_str);
|
||||
LogError(0, RS_RET_OK, "omotel: invalid span_id format (expected 16 hex chars): %s", span_id_str);
|
||||
free(span_id_str);
|
||||
span_id_str = NULL;
|
||||
}
|
||||
@ -1603,7 +1603,7 @@ finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
static void omotlp_batch_entry_clear(omotlp_batch_entry_t *entry) {
|
||||
static void omotel_batch_entry_clear(omotel_batch_entry_t *entry) {
|
||||
if (entry == NULL) {
|
||||
return;
|
||||
}
|
||||
@ -1619,7 +1619,7 @@ static void omotlp_batch_entry_clear(omotlp_batch_entry_t *entry) {
|
||||
memset(entry, 0, sizeof(*entry));
|
||||
}
|
||||
|
||||
static void omotlp_batch_clear(omotlp_batch_state_t *batch) {
|
||||
static void omotel_batch_clear(omotel_batch_state_t *batch) {
|
||||
size_t i;
|
||||
|
||||
if (batch == NULL) {
|
||||
@ -1627,7 +1627,7 @@ static void omotlp_batch_clear(omotlp_batch_state_t *batch) {
|
||||
}
|
||||
|
||||
for (i = 0; i < batch->count; ++i) {
|
||||
omotlp_batch_entry_clear(&batch->entries[i]);
|
||||
omotel_batch_entry_clear(&batch->entries[i]);
|
||||
}
|
||||
|
||||
batch->count = 0u;
|
||||
@ -1635,19 +1635,19 @@ static void omotlp_batch_clear(omotlp_batch_state_t *batch) {
|
||||
batch->first_enqueue_ms = 0;
|
||||
}
|
||||
|
||||
static void omotlp_batch_destroy(omotlp_batch_state_t *batch) {
|
||||
static void omotel_batch_destroy(omotel_batch_state_t *batch) {
|
||||
if (batch == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
omotlp_batch_clear(batch);
|
||||
omotel_batch_clear(batch);
|
||||
free(batch->entries);
|
||||
batch->entries = NULL;
|
||||
batch->capacity = 0u;
|
||||
}
|
||||
|
||||
static rsRetVal omotlp_batch_ensure_capacity(omotlp_batch_state_t *batch, size_t needed) {
|
||||
omotlp_batch_entry_t *tmp;
|
||||
static rsRetVal omotel_batch_ensure_capacity(omotel_batch_state_t *batch, size_t needed) {
|
||||
omotel_batch_entry_t *tmp;
|
||||
size_t new_capacity;
|
||||
|
||||
DEFiRet;
|
||||
@ -1665,12 +1665,12 @@ static rsRetVal omotlp_batch_ensure_capacity(omotlp_batch_state_t *batch, size_t
|
||||
new_capacity *= 2u;
|
||||
}
|
||||
|
||||
tmp = (omotlp_batch_entry_t *)realloc(batch->entries, new_capacity * sizeof(omotlp_batch_entry_t));
|
||||
tmp = (omotel_batch_entry_t *)realloc(batch->entries, new_capacity * sizeof(omotel_batch_entry_t));
|
||||
if (tmp == NULL) {
|
||||
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memset(tmp + batch->capacity, 0, (new_capacity - batch->capacity) * sizeof(omotlp_batch_entry_t));
|
||||
memset(tmp + batch->capacity, 0, (new_capacity - batch->capacity) * sizeof(omotel_batch_entry_t));
|
||||
batch->entries = tmp;
|
||||
batch->capacity = new_capacity;
|
||||
|
||||
@ -1702,7 +1702,7 @@ static rsRetVal gzip_compress_buffer(const uint8_t *input, size_t input_len, uin
|
||||
memset(&stream, 0, sizeof(stream));
|
||||
rc = deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
|
||||
if (rc != Z_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp: deflateInit2 failed: %d", rc);
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel: deflateInit2 failed: %d", rc);
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
@ -1713,7 +1713,7 @@ static rsRetVal gzip_compress_buffer(const uint8_t *input, size_t input_len, uin
|
||||
|
||||
rc = deflate(&stream, Z_FINISH);
|
||||
if (rc != Z_STREAM_END) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp: gzip compression failed: %d", rc);
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel: gzip compression failed: %d", rc);
|
||||
deflateEnd(&stream);
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
@ -1748,8 +1748,8 @@ finalize_it:
|
||||
* @return RS_RET_OK on success, RS_RET_SUSPENDED for retryable errors,
|
||||
* RS_RET_PARAM_ERROR for invalid parameters
|
||||
*/
|
||||
static rsRetVal omotlp_flush_batch_locked(wrkrInstanceData_t *pWrkrData, omotlp_batch_state_t *batch) {
|
||||
omotlp_log_record_t *records = NULL;
|
||||
static rsRetVal omotel_flush_batch_locked(wrkrInstanceData_t *pWrkrData, omotel_batch_state_t *batch) {
|
||||
omotel_log_record_t *records = NULL;
|
||||
char *payload = NULL;
|
||||
uint8_t *compressed = NULL;
|
||||
const uint8_t *to_send;
|
||||
@ -1759,27 +1759,27 @@ static rsRetVal omotlp_flush_batch_locked(wrkrInstanceData_t *pWrkrData, omotlp_
|
||||
long status_code = 0;
|
||||
long latency_ms = 0;
|
||||
size_t record_count = 0u;
|
||||
omotlp_resource_attrs_t resource_attrs;
|
||||
omotel_resource_attrs_t resource_attrs;
|
||||
|
||||
DEFiRet;
|
||||
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch called, batch->count=%zu", batch ? batch->count : 0u);
|
||||
DBGPRINTF("omotel: omotel_flush_batch called, batch->count=%zu", batch ? batch->count : 0u);
|
||||
|
||||
if (pWrkrData == NULL || batch == NULL) {
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
|
||||
if (batch->count == 0u) {
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: batch is empty, skipping");
|
||||
DBGPRINTF("omotel: omotel_flush_batch: batch is empty, skipping");
|
||||
goto finalize_it;
|
||||
}
|
||||
|
||||
record_count = batch->count;
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: flushing %zu records", record_count);
|
||||
DBGPRINTF("omotel: omotel_flush_batch: flushing %zu records", record_count);
|
||||
|
||||
STATSCOUNTER_INC(pWrkrData->ctrBatchesSubmitted, pWrkrData->mutCtrBatchesSubmitted);
|
||||
|
||||
records = (omotlp_log_record_t *)malloc(batch->count * sizeof(*records));
|
||||
records = (omotel_log_record_t *)malloc(batch->count * sizeof(*records));
|
||||
if (records == NULL) {
|
||||
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
||||
}
|
||||
@ -1788,8 +1788,8 @@ static rsRetVal omotlp_flush_batch_locked(wrkrInstanceData_t *pWrkrData, omotlp_
|
||||
records[i] = batch->entries[i].record;
|
||||
}
|
||||
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: building JSON export for %zu records", batch->count);
|
||||
resource_attrs = (omotlp_resource_attrs_t){
|
||||
DBGPRINTF("omotel: omotel_flush_batch: building JSON export for %zu records", batch->count);
|
||||
resource_attrs = (omotel_resource_attrs_t){
|
||||
.service_instance_id = pWrkrData->pData->resourceServiceInstanceId
|
||||
? (const char *)pWrkrData->pData->resourceServiceInstanceId
|
||||
: NULL,
|
||||
@ -1800,25 +1800,25 @@ static rsRetVal omotlp_flush_batch_locked(wrkrInstanceData_t *pWrkrData, omotlp_
|
||||
};
|
||||
|
||||
CHKiRet(
|
||||
omotlp_json_build_export(records, batch->count, &resource_attrs, &pWrkrData->pData->attributeMap, &payload));
|
||||
omotel_json_build_export(records, batch->count, &resource_attrs, &pWrkrData->pData->attributeMap, &payload));
|
||||
|
||||
if (payload != NULL) {
|
||||
payload_len = strlen(payload);
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: JSON payload length=%zu", payload_len);
|
||||
DBGPRINTF("omotel: omotel_flush_batch: JSON payload length=%zu", payload_len);
|
||||
}
|
||||
|
||||
to_send = (const uint8_t *)payload;
|
||||
send_len = payload_len;
|
||||
|
||||
if (pWrkrData->pData->compression_mode == OMOTLP_COMPRESSION_GZIP) {
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: compressing payload");
|
||||
if (pWrkrData->pData->compression_mode == OMOTEL_COMPRESSION_GZIP) {
|
||||
DBGPRINTF("omotel: omotel_flush_batch: compressing payload");
|
||||
CHKiRet(gzip_compress_buffer((const uint8_t *)payload, payload_len, &compressed, &send_len));
|
||||
to_send = compressed;
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: compressed size=%zu", send_len);
|
||||
DBGPRINTF("omotel: omotel_flush_batch: compressed size=%zu", send_len);
|
||||
}
|
||||
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: calling omotlp_http_client_post, send_len=%zu", send_len);
|
||||
iRet = omotlp_http_client_post(pWrkrData->http_client, to_send, send_len, &status_code, &latency_ms);
|
||||
DBGPRINTF("omotel: omotel_flush_batch: calling omotel_http_client_post, send_len=%zu", send_len);
|
||||
iRet = omotel_http_client_post(pWrkrData->http_client, to_send, send_len, &status_code, &latency_ms);
|
||||
|
||||
/* Update statistics based on HTTP response (status_code is set even on error) */
|
||||
if (status_code > 0) {
|
||||
@ -1828,8 +1828,8 @@ static rsRetVal omotlp_flush_batch_locked(wrkrInstanceData_t *pWrkrData, omotlp_
|
||||
if (latency_ms > 0) {
|
||||
STATSCOUNTER_ADD(pWrkrData->httpRequestLatencyMs, pWrkrData->mutHttpRequestLatencyMs, latency_ms);
|
||||
}
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: HTTP POST successful, clearing batch");
|
||||
omotlp_batch_clear(batch);
|
||||
DBGPRINTF("omotel: omotel_flush_batch: HTTP POST successful, clearing batch");
|
||||
omotel_batch_clear(batch);
|
||||
} else if (status_code >= 400 && status_code < 500) {
|
||||
STATSCOUNTER_INC(pWrkrData->ctrHttpStatus4xx, pWrkrData->mutCtrHttpStatus4xx);
|
||||
if (status_code == 408 || status_code == 429) {
|
||||
@ -1843,7 +1843,7 @@ static rsRetVal omotlp_flush_batch_locked(wrkrInstanceData_t *pWrkrData, omotlp_
|
||||
STATSCOUNTER_ADD(pWrkrData->httpRequestLatencyMs, pWrkrData->mutHttpRequestLatencyMs,
|
||||
latency_ms);
|
||||
}
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: HTTP %ld error, retaining batch for retry", status_code);
|
||||
DBGPRINTF("omotel: omotel_flush_batch: HTTP %ld error, retaining batch for retry", status_code);
|
||||
/* Don't clear batch - it will be retried by rsyslog */
|
||||
} else {
|
||||
STATSCOUNTER_INC(pWrkrData->ctrBatchesDropped, pWrkrData->mutCtrBatchesDropped);
|
||||
@ -1851,8 +1851,8 @@ static rsRetVal omotlp_flush_batch_locked(wrkrInstanceData_t *pWrkrData, omotlp_
|
||||
STATSCOUNTER_ADD(pWrkrData->httpRequestLatencyMs, pWrkrData->mutHttpRequestLatencyMs,
|
||||
latency_ms);
|
||||
}
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: HTTP %ld error, clearing batch", status_code);
|
||||
omotlp_batch_clear(batch);
|
||||
DBGPRINTF("omotel: omotel_flush_batch: HTTP %ld error, clearing batch", status_code);
|
||||
omotel_batch_clear(batch);
|
||||
}
|
||||
} else {
|
||||
/* Non-retryable 4xx (400, 401, 403, 404, etc.) - clear batch */
|
||||
@ -1860,8 +1860,8 @@ static rsRetVal omotlp_flush_batch_locked(wrkrInstanceData_t *pWrkrData, omotlp_
|
||||
if (latency_ms > 0) {
|
||||
STATSCOUNTER_ADD(pWrkrData->httpRequestLatencyMs, pWrkrData->mutHttpRequestLatencyMs, latency_ms);
|
||||
}
|
||||
DBGPRINTF("omotlp: omotlp_flush_batch: HTTP 4xx error, clearing batch");
|
||||
omotlp_batch_clear(batch);
|
||||
DBGPRINTF("omotel: omotel_flush_batch: HTTP 4xx error, clearing batch");
|
||||
omotel_batch_clear(batch);
|
||||
/* For non-retryable 4xx errors, clear the error so the caller can continue.
|
||||
* The batch has been dropped, but new records should still be accepted. */
|
||||
if (iRet == RS_RET_DISCARDMSG) {
|
||||
@ -1897,7 +1897,7 @@ finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
static rsRetVal omotlp_flush_batch(wrkrInstanceData_t *pWrkrData) {
|
||||
static rsRetVal omotel_flush_batch(wrkrInstanceData_t *pWrkrData) {
|
||||
rsRetVal iRet;
|
||||
|
||||
if (pWrkrData == NULL) {
|
||||
@ -1905,7 +1905,7 @@ static rsRetVal omotlp_flush_batch(wrkrInstanceData_t *pWrkrData) {
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&pWrkrData->batch_mutex);
|
||||
iRet = omotlp_flush_batch_locked(pWrkrData, &pWrkrData->batch);
|
||||
iRet = omotel_flush_batch_locked(pWrkrData, &pWrkrData->batch);
|
||||
pthread_mutex_unlock(&pWrkrData->batch_mutex);
|
||||
|
||||
return iRet;
|
||||
@ -1922,7 +1922,7 @@ static rsRetVal omotlp_flush_batch(wrkrInstanceData_t *pWrkrData) {
|
||||
* @param[in] arg Worker instance data pointer (cast from void*)
|
||||
* @return NULL (pthread requirement)
|
||||
*/
|
||||
static void *omotlp_batch_flush_thread(void *arg) {
|
||||
static void *omotel_batch_flush_thread(void *arg) {
|
||||
wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *)arg;
|
||||
struct timespec req;
|
||||
req.tv_sec = 0;
|
||||
@ -1939,13 +1939,13 @@ static void *omotlp_batch_flush_thread(void *arg) {
|
||||
|
||||
if (pWrkrData->batch.count > 0u) {
|
||||
long long timeout_ms =
|
||||
pWrkrData->pData->batchTimeoutMs > 0 ? pWrkrData->pData->batchTimeoutMs : OMOTLP_IDLE_FLUSH_INTERVAL_MS;
|
||||
pWrkrData->pData->batchTimeoutMs > 0 ? pWrkrData->pData->batchTimeoutMs : OMOTEL_IDLE_FLUSH_INTERVAL_MS;
|
||||
if (timeout_ms > 0) {
|
||||
long long now = currentTimeMills();
|
||||
if (pWrkrData->batch.first_enqueue_ms != 0 && now - pWrkrData->batch.first_enqueue_ms >= timeout_ms) {
|
||||
/* Check stop flag before starting potentially long-running flush operation */
|
||||
if (!pWrkrData->flush_thread_stop) {
|
||||
(void)omotlp_flush_batch_locked(pWrkrData, &pWrkrData->batch);
|
||||
(void)omotel_flush_batch_locked(pWrkrData, &pWrkrData->batch);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1957,7 +1957,7 @@ static void *omotlp_batch_flush_thread(void *arg) {
|
||||
pthread_mutex_lock(&pWrkrData->batch_mutex);
|
||||
/* Final flush before thread exit - flush any remaining data */
|
||||
if (pWrkrData->batch.count > 0u) {
|
||||
(void)omotlp_flush_batch_locked(pWrkrData, &pWrkrData->batch);
|
||||
(void)omotel_flush_batch_locked(pWrkrData, &pWrkrData->batch);
|
||||
}
|
||||
pthread_mutex_unlock(&pWrkrData->batch_mutex);
|
||||
|
||||
@ -1981,12 +1981,12 @@ static void *omotlp_batch_flush_thread(void *arg) {
|
||||
* @return RS_RET_OK on success, RS_RET_PARAM_ERROR for invalid parameters,
|
||||
* RS_RET_OUT_OF_MEMORY on allocation failure
|
||||
*/
|
||||
static rsRetVal omotlp_batch_add_record(wrkrInstanceData_t *pWrkrData,
|
||||
const omotlp_log_record_t *record,
|
||||
static rsRetVal omotel_batch_add_record(wrkrInstanceData_t *pWrkrData,
|
||||
const omotel_log_record_t *record,
|
||||
const char *body) {
|
||||
omotlp_batch_state_t *batch;
|
||||
omotel_batch_state_t *batch;
|
||||
instanceData *cfg;
|
||||
omotlp_batch_entry_t *entry = NULL;
|
||||
omotel_batch_entry_t *entry = NULL;
|
||||
const char *body_text;
|
||||
size_t body_len;
|
||||
size_t estimated_bytes;
|
||||
@ -2007,24 +2007,24 @@ static rsRetVal omotlp_batch_add_record(wrkrInstanceData_t *pWrkrData,
|
||||
mutex_locked = 1;
|
||||
|
||||
if (cfg->batchMaxItems > 0u && batch->count >= cfg->batchMaxItems) {
|
||||
CHKiRet(omotlp_flush_batch_locked(pWrkrData, batch));
|
||||
CHKiRet(omotel_flush_batch_locked(pWrkrData, batch));
|
||||
}
|
||||
|
||||
body_text = (body != NULL) ? body : "";
|
||||
body_len = strlen(body_text);
|
||||
estimated_bytes = OMOTLP_BATCH_RECORD_OVERHEAD + body_len;
|
||||
estimated_bytes = OMOTEL_BATCH_RECORD_OVERHEAD + body_len;
|
||||
|
||||
if (cfg->batchMaxBytes > 0u && batch->count > 0u && batch->estimated_bytes + estimated_bytes > cfg->batchMaxBytes) {
|
||||
CHKiRet(omotlp_flush_batch_locked(pWrkrData, batch));
|
||||
CHKiRet(omotel_flush_batch_locked(pWrkrData, batch));
|
||||
}
|
||||
|
||||
if (cfg->batchMaxBytes > 0u && estimated_bytes > cfg->batchMaxBytes) {
|
||||
LogError(0, RS_RET_OK,
|
||||
"omotlp: single record estimated size %zu exceeds batch.max_bytes %zu; sending individually",
|
||||
"omotel: single record estimated size %zu exceeds batch.max_bytes %zu; sending individually",
|
||||
estimated_bytes, cfg->batchMaxBytes);
|
||||
}
|
||||
|
||||
CHKiRet(omotlp_batch_ensure_capacity(batch, batch->count + 1u));
|
||||
CHKiRet(omotel_batch_ensure_capacity(batch, batch->count + 1u));
|
||||
entry = &batch->entries[batch->count];
|
||||
memset(entry, 0, sizeof(*entry));
|
||||
|
||||
@ -2059,7 +2059,7 @@ static rsRetVal omotlp_batch_add_record(wrkrInstanceData_t *pWrkrData,
|
||||
++batch->count;
|
||||
count_incremented = 1;
|
||||
if (batch->count == 1u) {
|
||||
batch->estimated_bytes = OMOTLP_BATCH_BASE_OVERHEAD + estimated_bytes;
|
||||
batch->estimated_bytes = OMOTEL_BATCH_BASE_OVERHEAD + estimated_bytes;
|
||||
batch->first_enqueue_ms = currentTimeMills();
|
||||
estimated_bytes_updated = 1;
|
||||
} else {
|
||||
@ -2068,9 +2068,9 @@ static rsRetVal omotlp_batch_add_record(wrkrInstanceData_t *pWrkrData,
|
||||
}
|
||||
|
||||
if (cfg->batchMaxItems > 0u && batch->count >= cfg->batchMaxItems) {
|
||||
CHKiRet(omotlp_flush_batch_locked(pWrkrData, batch));
|
||||
CHKiRet(omotel_flush_batch_locked(pWrkrData, batch));
|
||||
} else if (cfg->batchMaxBytes > 0u && batch->estimated_bytes >= cfg->batchMaxBytes) {
|
||||
CHKiRet(omotlp_flush_batch_locked(pWrkrData, batch));
|
||||
CHKiRet(omotel_flush_batch_locked(pWrkrData, batch));
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
@ -2092,12 +2092,12 @@ finalize_it:
|
||||
}
|
||||
/* Cleanup entry if it was allocated */
|
||||
if (entry != NULL) {
|
||||
omotlp_batch_entry_clear(entry);
|
||||
omotel_batch_entry_clear(entry);
|
||||
}
|
||||
} else {
|
||||
/* Cleanup entry without mutex if mutex was never acquired */
|
||||
if (entry != NULL) {
|
||||
omotlp_batch_entry_clear(entry);
|
||||
omotel_batch_entry_clear(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2107,8 +2107,8 @@ finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
static rsRetVal omotlp_batch_flush_if_due(wrkrInstanceData_t *pWrkrData, long long now_ms) {
|
||||
omotlp_batch_state_t *batch;
|
||||
static rsRetVal omotel_batch_flush_if_due(wrkrInstanceData_t *pWrkrData, long long now_ms) {
|
||||
omotel_batch_state_t *batch;
|
||||
long long age;
|
||||
int mutex_locked = 0;
|
||||
|
||||
@ -2131,7 +2131,7 @@ static rsRetVal omotlp_batch_flush_if_due(wrkrInstanceData_t *pWrkrData, long lo
|
||||
|
||||
age = now_ms - batch->first_enqueue_ms;
|
||||
if (age >= pWrkrData->pData->batchTimeoutMs) {
|
||||
CHKiRet(omotlp_flush_batch_locked(pWrkrData, batch));
|
||||
CHKiRet(omotel_flush_batch_locked(pWrkrData, batch));
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
@ -2148,14 +2148,14 @@ static inline void setInstParamDefaults(instanceData *pData) {
|
||||
pData->bodyTemplateName = NULL;
|
||||
pData->url = NULL;
|
||||
pData->requestTimeoutMs = 10000;
|
||||
pData->batchMaxItems = OMOTLP_DEFAULT_BATCH_MAX_ITEMS;
|
||||
pData->batchMaxBytes = OMOTLP_DEFAULT_BATCH_MAX_BYTES;
|
||||
pData->batchTimeoutMs = OMOTLP_DEFAULT_BATCH_TIMEOUT_MS;
|
||||
pData->retryInitialMs = OMOTLP_DEFAULT_RETRY_INITIAL_MS;
|
||||
pData->retryMaxMs = OMOTLP_DEFAULT_RETRY_MAX_MS;
|
||||
pData->retryMaxRetries = OMOTLP_DEFAULT_RETRY_MAX_RETRIES;
|
||||
pData->retryJitterPercent = OMOTLP_DEFAULT_RETRY_JITTER_PERCENT;
|
||||
pData->compression_mode = OMOTLP_COMPRESSION_UNSET;
|
||||
pData->batchMaxItems = OMOTEL_DEFAULT_BATCH_MAX_ITEMS;
|
||||
pData->batchMaxBytes = OMOTEL_DEFAULT_BATCH_MAX_BYTES;
|
||||
pData->batchTimeoutMs = OMOTEL_DEFAULT_BATCH_TIMEOUT_MS;
|
||||
pData->retryInitialMs = OMOTEL_DEFAULT_RETRY_INITIAL_MS;
|
||||
pData->retryMaxMs = OMOTEL_DEFAULT_RETRY_MAX_MS;
|
||||
pData->retryMaxRetries = OMOTEL_DEFAULT_RETRY_MAX_RETRIES;
|
||||
pData->retryJitterPercent = OMOTEL_DEFAULT_RETRY_JITTER_PERCENT;
|
||||
pData->compression_mode = OMOTEL_COMPRESSION_UNSET;
|
||||
pData->compressionConfigured = 0;
|
||||
pData->headersConfigured = 0;
|
||||
pData->bearerConfigured = 0;
|
||||
@ -2217,7 +2217,7 @@ BEGINcreateInstance
|
||||
ENDcreateInstance
|
||||
|
||||
BEGINcreateWrkrInstance
|
||||
omotlp_http_client_config_t http_cfg;
|
||||
omotel_http_client_config_t http_cfg;
|
||||
char stats_name[256];
|
||||
CODESTARTcreateWrkrInstance;
|
||||
pWrkrData->pData = pData;
|
||||
@ -2239,7 +2239,7 @@ BEGINcreateWrkrInstance
|
||||
|
||||
http_cfg.url = (const char *)pData->url;
|
||||
http_cfg.timeout_ms = pData->requestTimeoutMs;
|
||||
http_cfg.user_agent = "rsyslog-omotlp/" VERSION;
|
||||
http_cfg.user_agent = "rsyslog-omotel/" VERSION;
|
||||
http_cfg.headers = (const char *const *)pData->headers.values;
|
||||
http_cfg.header_count = pData->headers.count;
|
||||
http_cfg.retry_initial_ms = pData->retryInitialMs;
|
||||
@ -2260,17 +2260,17 @@ BEGINcreateWrkrInstance
|
||||
http_cfg.proxy_user = (const char *)pData->proxyUser;
|
||||
http_cfg.proxy_password = (const char *)pData->proxyPassword;
|
||||
|
||||
iRet = omotlp_http_client_create(&http_cfg, &pWrkrData->http_client);
|
||||
iRet = omotel_http_client_create(&http_cfg, &pWrkrData->http_client);
|
||||
if (iRet != RS_RET_OK) {
|
||||
goto finalize_it;
|
||||
}
|
||||
|
||||
/* Initialize statistics */
|
||||
snprintf(stats_name, sizeof(stats_name), "omotlp-%s", pData->url ? (char *)pData->url : "default");
|
||||
snprintf(stats_name, sizeof(stats_name), "omotel-%s", pData->url ? (char *)pData->url : "default");
|
||||
stats_name[sizeof(stats_name) - 1] = '\0';
|
||||
CHKiRet(statsobj.Construct(&pWrkrData->stats));
|
||||
CHKiRet(statsobj.SetName(pWrkrData->stats, (uchar *)stats_name));
|
||||
CHKiRet(statsobj.SetOrigin(pWrkrData->stats, (uchar *)"omotlp"));
|
||||
CHKiRet(statsobj.SetOrigin(pWrkrData->stats, (uchar *)"omotel"));
|
||||
|
||||
STATSCOUNTER_INIT(pWrkrData->ctrBatchesSubmitted, pWrkrData->mutCtrBatchesSubmitted);
|
||||
CHKiRet(statsobj.AddCounter(pWrkrData->stats, (uchar *)"batches.submitted", ctrType_IntCtr, CTR_FLAG_RESETTABLE,
|
||||
@ -2306,8 +2306,8 @@ BEGINcreateWrkrInstance
|
||||
|
||||
CHKiRet(statsobj.ConstructFinalize(pWrkrData->stats));
|
||||
|
||||
if (pthread_create(&pWrkrData->flush_thread, NULL, omotlp_batch_flush_thread, pWrkrData) != 0) {
|
||||
LogError(errno, RS_RET_SYS_ERR, "omotlp: failed to create flush thread");
|
||||
if (pthread_create(&pWrkrData->flush_thread, NULL, omotel_batch_flush_thread, pWrkrData) != 0) {
|
||||
LogError(errno, RS_RET_SYS_ERR, "omotel: failed to create flush thread");
|
||||
iRet = RS_RET_SYS_ERR;
|
||||
goto finalize_it;
|
||||
}
|
||||
@ -2329,7 +2329,7 @@ finalize_it:
|
||||
}
|
||||
pthread_mutex_destroy(&pWrkrData->batch_mutex);
|
||||
}
|
||||
omotlp_http_client_destroy(&pWrkrData->http_client);
|
||||
omotel_http_client_destroy(&pWrkrData->http_client);
|
||||
free(pWrkrData);
|
||||
pWrkrData = NULL;
|
||||
}
|
||||
@ -2379,10 +2379,10 @@ BEGINfreeWrkrInstance
|
||||
pthread_join(pWrkrData->flush_thread, NULL);
|
||||
pWrkrData->flush_thread_running = 0;
|
||||
}
|
||||
(void)omotlp_flush_batch(pWrkrData);
|
||||
omotlp_batch_destroy(&pWrkrData->batch);
|
||||
(void)omotel_flush_batch(pWrkrData);
|
||||
omotel_batch_destroy(&pWrkrData->batch);
|
||||
pthread_mutex_destroy(&pWrkrData->batch_mutex);
|
||||
omotlp_http_client_destroy(&pWrkrData->http_client);
|
||||
omotel_http_client_destroy(&pWrkrData->http_client);
|
||||
if (pWrkrData->stats != NULL) {
|
||||
statsobj.Destruct(&pWrkrData->stats);
|
||||
}
|
||||
@ -2391,7 +2391,7 @@ ENDfreeWrkrInstance
|
||||
|
||||
BEGINdbgPrintInstInfo
|
||||
CODESTARTdbgPrintInstInfo;
|
||||
dbgprintf("omotlp\n");
|
||||
dbgprintf("omotel\n");
|
||||
dbgprintf("\tendpoint='%s'\n", pData->endpoint ? (char *)pData->endpoint : "(default)");
|
||||
dbgprintf("\tpath='%s'\n", pData->path ? (char *)pData->path : "(default)");
|
||||
dbgprintf("\tprotocol='%s'\n", pData->protocol ? (char *)pData->protocol : "(default)");
|
||||
@ -2405,7 +2405,7 @@ BEGINdbgPrintInstInfo
|
||||
dbgprintf("\tretry.max.ms=%ld\n", pData->retryMaxMs);
|
||||
dbgprintf("\tretry.max_retries=%u\n", pData->retryMaxRetries);
|
||||
dbgprintf("\tretry.jitter.percent=%u\n", pData->retryJitterPercent);
|
||||
dbgprintf("\tcompression=%s\n", pData->compression_mode == OMOTLP_COMPRESSION_GZIP ? "gzip" : "none");
|
||||
dbgprintf("\tcompression=%s\n", pData->compression_mode == OMOTEL_COMPRESSION_GZIP ? "gzip" : "none");
|
||||
ENDdbgPrintInstInfo
|
||||
|
||||
BEGINtryResume
|
||||
@ -2442,7 +2442,7 @@ BEGINnewActInst
|
||||
CODESTARTnewActInst;
|
||||
|
||||
if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
|
||||
LogError(0, RS_RET_MISSING_CNFPARAMS, "omotlp: error reading config parameters");
|
||||
LogError(0, RS_RET_MISSING_CNFPARAMS, "omotel: error reading config parameters");
|
||||
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
|
||||
}
|
||||
|
||||
@ -2501,7 +2501,7 @@ BEGINnewActInst
|
||||
unsigned int jitter = 0u;
|
||||
CHKiRet(parse_uint_param("retry.jitter.percent", pvals[i].val.d.estr, 0u, &jitter));
|
||||
if (jitter > 100u) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: retry.jitter.percent must be between 0 and 100");
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: retry.jitter.percent must be between 0 and 100");
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
pData->retryJitterPercent = jitter;
|
||||
@ -2548,7 +2548,7 @@ BEGINnewActInst
|
||||
/* Parse and validate JSON */
|
||||
resource_parsed = fjson_tokener_parse(resource_json_text);
|
||||
if (resource_parsed == NULL) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: resource parameter contains invalid JSON: %s",
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: resource parameter contains invalid JSON: %s",
|
||||
resource_json_text);
|
||||
free(resource_json_text);
|
||||
resource_json_text = NULL;
|
||||
@ -2556,7 +2556,7 @@ BEGINnewActInst
|
||||
}
|
||||
|
||||
if (!fjson_object_is_type(resource_parsed, fjson_type_object)) {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: resource parameter must be a JSON object");
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: resource parameter must be a JSON object");
|
||||
fjson_object_put(resource_parsed);
|
||||
resource_parsed = NULL;
|
||||
free(resource_json_text);
|
||||
@ -2599,7 +2599,7 @@ BEGINnewActInst
|
||||
if (fp == NULL) {
|
||||
char errStr[1024];
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr));
|
||||
LogError(0, RS_RET_NO_FILE_ACCESS, "omotlp: tls.cacert file '%s' cannot be accessed: %s",
|
||||
LogError(0, RS_RET_NO_FILE_ACCESS, "omotel: tls.cacert file '%s' cannot be accessed: %s",
|
||||
pData->tlsCaCertFile, errStr);
|
||||
ABORT_FINALIZE(RS_RET_NO_FILE_ACCESS);
|
||||
}
|
||||
@ -2611,7 +2611,7 @@ BEGINnewActInst
|
||||
if (dir == NULL) {
|
||||
char errStr[1024];
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr));
|
||||
LogError(0, RS_RET_NO_FILE_ACCESS, "omotlp: tls.cadir directory '%s' cannot be accessed: %s",
|
||||
LogError(0, RS_RET_NO_FILE_ACCESS, "omotel: tls.cadir directory '%s' cannot be accessed: %s",
|
||||
pData->tlsCaCertDir, errStr);
|
||||
ABORT_FINALIZE(RS_RET_NO_FILE_ACCESS);
|
||||
}
|
||||
@ -2623,7 +2623,7 @@ BEGINnewActInst
|
||||
if (fp == NULL) {
|
||||
char errStr[1024];
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr));
|
||||
LogError(0, RS_RET_NO_FILE_ACCESS, "omotlp: tls.cert file '%s' cannot be accessed: %s",
|
||||
LogError(0, RS_RET_NO_FILE_ACCESS, "omotel: tls.cert file '%s' cannot be accessed: %s",
|
||||
pData->tlsClientCertFile, errStr);
|
||||
ABORT_FINALIZE(RS_RET_NO_FILE_ACCESS);
|
||||
}
|
||||
@ -2635,7 +2635,7 @@ BEGINnewActInst
|
||||
if (fp == NULL) {
|
||||
char errStr[1024];
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr));
|
||||
LogError(0, RS_RET_NO_FILE_ACCESS, "omotlp: tls.key file '%s' cannot be accessed: %s",
|
||||
LogError(0, RS_RET_NO_FILE_ACCESS, "omotel: tls.key file '%s' cannot be accessed: %s",
|
||||
pData->tlsClientKeyFile, errStr);
|
||||
ABORT_FINALIZE(RS_RET_NO_FILE_ACCESS);
|
||||
}
|
||||
@ -2651,7 +2651,7 @@ BEGINnewActInst
|
||||
} else if (!strcmp(text, "off") || !strcmp(text, "no") || !strcmp(text, "0")) {
|
||||
pData->tlsVerifyHostname = 0;
|
||||
} else {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: tls.verify_hostname must be 'on' or 'off'");
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: tls.verify_hostname must be 'on' or 'off'");
|
||||
free(text);
|
||||
text = NULL;
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
@ -2669,7 +2669,7 @@ BEGINnewActInst
|
||||
} else if (!strcmp(text, "off") || !strcmp(text, "no") || !strcmp(text, "0")) {
|
||||
pData->tlsVerifyPeer = 0;
|
||||
} else {
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotlp: tls.verify_peer must be 'on' or 'off'");
|
||||
LogError(0, RS_RET_PARAM_ERROR, "omotel: tls.verify_peer must be 'on' or 'off'");
|
||||
free(text);
|
||||
text = NULL;
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
@ -2685,7 +2685,7 @@ BEGINnewActInst
|
||||
strncmp((char *)pData->proxyUrl, "socks4://", 9) != 0 &&
|
||||
strncmp((char *)pData->proxyUrl, "socks5://", 9) != 0) {
|
||||
LogError(0, RS_RET_PARAM_ERROR,
|
||||
"omotlp: proxy URL must start with http://, https://, socks4://, or socks5://");
|
||||
"omotel: proxy URL must start with http://, https://, socks4://, or socks5://");
|
||||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||||
}
|
||||
}
|
||||
@ -2694,7 +2694,7 @@ BEGINnewActInst
|
||||
} else if (!strcmp(actpblk.descr[i].name, "proxy.password")) {
|
||||
CHKiRet(assignParamFromEStr(&pData->proxyPassword, pvals[i].val.d.estr));
|
||||
} else {
|
||||
dbgprintf("omotlp: unhandled parameter '%s'\n", actpblk.descr[i].name);
|
||||
dbgprintf("omotel: unhandled parameter '%s'\n", actpblk.descr[i].name);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2712,11 +2712,11 @@ BEGINnewActInst
|
||||
CHKmalloc(pData->traceFlagsPropertyName = (uchar *)strdup("trace_flags"));
|
||||
}
|
||||
|
||||
if (pData->compression_mode == OMOTLP_COMPRESSION_UNSET) {
|
||||
pData->compression_mode = OMOTLP_COMPRESSION_NONE;
|
||||
if (pData->compression_mode == OMOTEL_COMPRESSION_UNSET) {
|
||||
pData->compression_mode = OMOTEL_COMPRESSION_NONE;
|
||||
}
|
||||
|
||||
if (pData->compression_mode == OMOTLP_COMPRESSION_GZIP) {
|
||||
if (pData->compression_mode == OMOTEL_COMPRESSION_GZIP) {
|
||||
CHKiRet(header_list_add(&pData->headers, "Content-Encoding: gzip"));
|
||||
}
|
||||
|
||||
@ -2730,13 +2730,13 @@ BEGINnewActInst
|
||||
CHKiRet(buildEffectiveUrl(pData));
|
||||
|
||||
CODE_STD_STRING_REQUESTnewActInst(2);
|
||||
CHKiRet(OMSRsetEntry(*ppOMSR, OMOTLP_OMSR_IDX_MESSAGE, NULL, OMSR_TPL_AS_MSG));
|
||||
CHKiRet(OMSRsetEntry(*ppOMSR, OMOTEL_OMSR_IDX_MESSAGE, NULL, OMSR_TPL_AS_MSG));
|
||||
|
||||
tplToUse = (uchar *)strdup((char *)pData->bodyTemplateName);
|
||||
if (tplToUse == NULL) {
|
||||
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
||||
}
|
||||
CHKiRet(OMSRsetEntry(*ppOMSR, OMOTLP_OMSR_IDX_BODY, tplToUse, OMSR_NO_RQD_TPL_OPTS));
|
||||
CHKiRet(OMSRsetEntry(*ppOMSR, OMOTEL_OMSR_IDX_BODY, tplToUse, OMSR_NO_RQD_TPL_OPTS));
|
||||
|
||||
CODE_STD_FINALIZERnewActInst;
|
||||
/* Free text allocations if they were allocated but not freed (error path cleanup) */
|
||||
@ -2776,10 +2776,10 @@ ENDnewActInst
|
||||
|
||||
BEGINdoAction
|
||||
char *body = NULL;
|
||||
omotlp_log_record_t record = {0};
|
||||
omotel_log_record_t record = {0};
|
||||
long long now_ms;
|
||||
smsg_t **ppMsgParam = (smsg_t **)pMsgData;
|
||||
smsg_t *msg = (ppMsgParam != NULL) ? ppMsgParam[OMOTLP_OMSR_IDX_MESSAGE] : NULL;
|
||||
smsg_t *msg = (ppMsgParam != NULL) ? ppMsgParam[OMOTEL_OMSR_IDX_MESSAGE] : NULL;
|
||||
CODESTARTdoAction;
|
||||
|
||||
if (pWrkrData->pData == NULL) {
|
||||
@ -2790,8 +2790,8 @@ BEGINdoAction
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
if (ppString != NULL && ppString[OMOTLP_OMSR_IDX_BODY] != NULL) {
|
||||
body = (char *)ppString[OMOTLP_OMSR_IDX_BODY];
|
||||
if (ppString != NULL && ppString[OMOTEL_OMSR_IDX_BODY] != NULL) {
|
||||
body = (char *)ppString[OMOTEL_OMSR_IDX_BODY];
|
||||
}
|
||||
|
||||
if (body == NULL) {
|
||||
@ -2799,10 +2799,10 @@ BEGINdoAction
|
||||
}
|
||||
|
||||
now_ms = currentTimeMills();
|
||||
CHKiRet(omotlp_batch_flush_if_due(pWrkrData, now_ms));
|
||||
CHKiRet(omotel_batch_flush_if_due(pWrkrData, now_ms));
|
||||
|
||||
CHKiRet(populateLogRecord(msg, body, pWrkrData->pData, &record));
|
||||
CHKiRet(omotlp_batch_add_record(pWrkrData, &record, body));
|
||||
CHKiRet(omotel_batch_add_record(pWrkrData, &record, body));
|
||||
|
||||
pthread_mutex_lock(&pWrkrData->batch_mutex);
|
||||
if (pWrkrData->batch.count == 0u) {
|
||||
@ -2827,7 +2827,7 @@ ENDdoAction
|
||||
NO_LEGACY_CONF_parseSelectorAct /* clang-format off */
|
||||
BEGINmodExit
|
||||
CODESTARTmodExit;
|
||||
omotlp_http_global_cleanup();
|
||||
omotel_http_global_cleanup();
|
||||
objRelease(datetime, CORE_COMPONENT);
|
||||
objRelease(prop, CORE_COMPONENT);
|
||||
objRelease(statsobj, CORE_COMPONENT);
|
||||
@ -2848,7 +2848,7 @@ ENDqueryEtryPt /* clang-format on */
|
||||
BEGINmodInit()
|
||||
CODESTARTmodInit;
|
||||
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
|
||||
CHKiRet(omotlp_http_global_init());
|
||||
CHKiRet(omotel_http_global_init());
|
||||
CHKiRet(objUse(datetime, CORE_COMPONENT));
|
||||
CHKiRet(objUse(prop, CORE_COMPONENT));
|
||||
CHKiRet(objUse(statsobj, CORE_COMPONENT));
|
||||
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* @file omotlp_http.c
|
||||
* @file omotel_http.c
|
||||
* @brief HTTP client implementation for OTLP/HTTP JSON transport
|
||||
*
|
||||
* This file implements the HTTP client interface using libcurl. It provides
|
||||
@ -26,7 +26,7 @@
|
||||
*/
|
||||
#include "config.h"
|
||||
|
||||
#include "omotlp_http.h"
|
||||
#include "omotel_http.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
#include <stdio.h>
|
||||
@ -37,7 +37,7 @@
|
||||
|
||||
#include "errmsg.h"
|
||||
|
||||
struct omotlp_http_client_s {
|
||||
struct omotel_http_client_s {
|
||||
CURL *handle;
|
||||
struct curl_slist *headers;
|
||||
char *url;
|
||||
@ -75,7 +75,7 @@ static size_t discard_response(void *ptr, size_t size, size_t nmemb, void *userd
|
||||
* @param[in] config Configuration parameters
|
||||
* @return RS_RET_OK on success, RS_RET_INTERNAL_ERROR on curl configuration failure
|
||||
*/
|
||||
static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_http_client_config_t *config) {
|
||||
static rsRetVal set_common_options(omotel_http_client_t *client, const omotel_http_client_config_t *config) {
|
||||
CURLcode rc;
|
||||
DEFiRet;
|
||||
|
||||
@ -85,32 +85,32 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_URL, client->url);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set URL: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set URL: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_POST, 1L);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to enable POST: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to enable POST: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_WRITEFUNCTION, discard_response);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to install response sink: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to install response sink: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_NOSIGNAL, 1L);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to disable signals: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to disable signals: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
if (config->timeout_ms > 0) {
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_TIMEOUT_MS, config->timeout_ms);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set timeout: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set timeout: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
@ -118,20 +118,20 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
if (config->user_agent != NULL) {
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_USERAGENT, config->user_agent);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set user-agent: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set user-agent: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_ERRORBUFFER, client->error_buffer);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to install error buffer: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to install error buffer: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_HTTPHEADER, client->headers);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to apply headers: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to apply headers: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
@ -139,7 +139,7 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
if (config->tls_ca_cert_file != NULL && config->tls_ca_cert_file[0] != '\0') {
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_CAINFO, config->tls_ca_cert_file);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set CA cert file: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set CA cert file: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
@ -147,7 +147,7 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
if (config->tls_ca_cert_dir != NULL && config->tls_ca_cert_dir[0] != '\0') {
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_CAPATH, config->tls_ca_cert_dir);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set CA cert directory: %s",
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set CA cert directory: %s",
|
||||
curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
@ -156,7 +156,7 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
if (config->tls_client_cert_file != NULL && config->tls_client_cert_file[0] != '\0') {
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_SSLCERT, config->tls_client_cert_file);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set client cert: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set client cert: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
@ -164,7 +164,7 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
if (config->tls_client_key_file != NULL && config->tls_client_key_file[0] != '\0') {
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_SSLKEY, config->tls_client_key_file);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set client key: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set client key: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
@ -172,7 +172,7 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
/* Hostname verification */
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_SSL_VERIFYHOST, config->tls_verify_hostname ? 2L : 0L);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set hostname verification: %s",
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set hostname verification: %s",
|
||||
curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
@ -180,7 +180,7 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
/* Peer certificate verification */
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_SSL_VERIFYPEER, config->tls_verify_peer ? 1L : 0L);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set peer verification: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set peer verification: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
@ -188,7 +188,7 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
if (config->proxy_url != NULL && config->proxy_url[0] != '\0') {
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_PROXY, config->proxy_url);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set proxy URL: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set proxy URL: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
@ -209,7 +209,7 @@ static rsRetVal set_common_options(omotlp_http_client_t *client, const omotlp_ht
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_PROXYUSERPWD, userpwd);
|
||||
if (rc != CURLE_OK) {
|
||||
free(userpwd);
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set proxy credentials: %s",
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set proxy credentials: %s",
|
||||
curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
@ -222,11 +222,11 @@ finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
rsRetVal omotlp_http_global_init(void) {
|
||||
rsRetVal omotel_http_global_init(void) {
|
||||
if (!g_http_global_initialized) {
|
||||
CURLcode rc = curl_global_init(CURL_GLOBAL_DEFAULT);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: curl_global_init failed: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: curl_global_init failed: %s", curl_easy_strerror(rc));
|
||||
return RS_RET_INTERNAL_ERROR;
|
||||
}
|
||||
g_http_global_initialized = 1;
|
||||
@ -235,15 +235,15 @@ rsRetVal omotlp_http_global_init(void) {
|
||||
return RS_RET_OK;
|
||||
}
|
||||
|
||||
void omotlp_http_global_cleanup(void) {
|
||||
void omotel_http_global_cleanup(void) {
|
||||
if (g_http_global_initialized) {
|
||||
curl_global_cleanup();
|
||||
g_http_global_initialized = 0;
|
||||
}
|
||||
}
|
||||
|
||||
rsRetVal omotlp_http_client_create(const omotlp_http_client_config_t *config, omotlp_http_client_t **out_client) {
|
||||
omotlp_http_client_t *client = NULL;
|
||||
rsRetVal omotel_http_client_create(const omotel_http_client_config_t *config, omotel_http_client_t **out_client) {
|
||||
omotel_http_client_t *client = NULL;
|
||||
DEFiRet;
|
||||
|
||||
if (config == NULL || out_client == NULL || config->url == NULL) {
|
||||
@ -282,7 +282,7 @@ rsRetVal omotlp_http_client_create(const omotlp_http_client_config_t *config, om
|
||||
|
||||
client->handle = curl_easy_init();
|
||||
if (client->handle == NULL) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: curl_easy_init failed");
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: curl_easy_init failed");
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
@ -298,14 +298,14 @@ rsRetVal omotlp_http_client_create(const omotlp_http_client_config_t *config, om
|
||||
|
||||
finalize_it:
|
||||
if (iRet != RS_RET_OK) {
|
||||
omotlp_http_client_destroy(&client);
|
||||
omotel_http_client_destroy(&client);
|
||||
}
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
void omotlp_http_client_destroy(omotlp_http_client_t **client_ptr) {
|
||||
omotlp_http_client_t *client;
|
||||
void omotel_http_client_destroy(omotel_http_client_t **client_ptr) {
|
||||
omotel_http_client_t *client;
|
||||
|
||||
if (client_ptr == NULL || *client_ptr == NULL) {
|
||||
return;
|
||||
@ -382,7 +382,7 @@ static int should_retry_status(long status) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
rsRetVal omotlp_http_client_post(omotlp_http_client_t *client,
|
||||
rsRetVal omotel_http_client_post(omotel_http_client_t *client,
|
||||
const uint8_t *payload,
|
||||
size_t payload_len,
|
||||
long *out_status_code,
|
||||
@ -397,7 +397,7 @@ rsRetVal omotlp_http_client_post(omotlp_http_client_t *client,
|
||||
long long end_ms = 0;
|
||||
DEFiRet;
|
||||
|
||||
DBGPRINTF("omotlp/http: omotlp_http_client_post called, payload_len=%zu, url=%s", payload_len,
|
||||
DBGPRINTF("omotel/http: omotel_http_client_post called, payload_len=%zu, url=%s", payload_len,
|
||||
client ? (client->url ? client->url : "(null)") : "(null client)");
|
||||
|
||||
if (client == NULL) {
|
||||
@ -417,13 +417,13 @@ rsRetVal omotlp_http_client_post(omotlp_http_client_t *client,
|
||||
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_POSTFIELDS, (const char *)payload_bytes);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set payload: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set payload: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
rc = curl_easy_setopt(client->handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)payload_len);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to set payload size: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to set payload size: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
@ -434,13 +434,13 @@ rsRetVal omotlp_http_client_post(omotlp_http_client_t *client,
|
||||
status = 0;
|
||||
start_ms = currentTimeMills();
|
||||
|
||||
DBGPRINTF("omotlp/http: calling curl_easy_perform, attempt %u", retries + 1);
|
||||
DBGPRINTF("omotel/http: calling curl_easy_perform, attempt %u", retries + 1);
|
||||
rc = curl_easy_perform(client->handle);
|
||||
end_ms = currentTimeMills();
|
||||
DBGPRINTF("omotlp/http: curl_easy_perform returned: %d (%s)", rc, curl_easy_strerror(rc));
|
||||
DBGPRINTF("omotel/http: curl_easy_perform returned: %d (%s)", rc, curl_easy_strerror(rc));
|
||||
if (rc != CURLE_OK) {
|
||||
const char *err = client->error_buffer[0] != '\0' ? client->error_buffer : curl_easy_strerror(rc);
|
||||
LogError(0, RS_RET_SUSPENDED, "omotlp/http: HTTP POST failed: %s", err);
|
||||
LogError(0, RS_RET_SUSPENDED, "omotel/http: HTTP POST failed: %s", err);
|
||||
|
||||
if (retries >= client->retry_max_retries) {
|
||||
ABORT_FINALIZE(RS_RET_SUSPENDED);
|
||||
@ -459,18 +459,18 @@ rsRetVal omotlp_http_client_post(omotlp_http_client_t *client,
|
||||
|
||||
rc = curl_easy_getinfo(client->handle, CURLINFO_RESPONSE_CODE, &status);
|
||||
if (rc != CURLE_OK) {
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotlp/http: failed to read response code: %s", curl_easy_strerror(rc));
|
||||
LogError(0, RS_RET_INTERNAL_ERROR, "omotel/http: failed to read response code: %s", curl_easy_strerror(rc));
|
||||
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
DBGPRINTF("omotlp/http: HTTP response status: %ld", status);
|
||||
DBGPRINTF("omotel/http: HTTP response status: %ld", status);
|
||||
if (status >= 200 && status < 300) {
|
||||
DBGPRINTF("omotlp/http: HTTP POST successful (status %ld)", status);
|
||||
DBGPRINTF("omotel/http: HTTP POST successful (status %ld)", status);
|
||||
goto finalize_it;
|
||||
}
|
||||
|
||||
if (should_retry_status(status)) {
|
||||
LogError(0, RS_RET_SUSPENDED, "omotlp/http: collector returned status %ld; retrying batch", status);
|
||||
LogError(0, RS_RET_SUSPENDED, "omotel/http: collector returned status %ld; retrying batch", status);
|
||||
if (retries >= client->retry_max_retries) {
|
||||
ABORT_FINALIZE(RS_RET_SUSPENDED);
|
||||
}
|
||||
@ -486,7 +486,7 @@ rsRetVal omotlp_http_client_post(omotlp_http_client_t *client,
|
||||
continue;
|
||||
}
|
||||
|
||||
LogError(0, RS_RET_DISCARDMSG, "omotlp/http: collector rejected payload with status %ld", status);
|
||||
LogError(0, RS_RET_DISCARDMSG, "omotel/http: collector rejected payload with status %ld", status);
|
||||
ABORT_FINALIZE(RS_RET_DISCARDMSG);
|
||||
}
|
||||
|
||||
@ -497,6 +497,6 @@ finalize_it:
|
||||
if (out_latency_ms != NULL && start_ms > 0 && end_ms >= start_ms) {
|
||||
*out_latency_ms = (long)(end_ms - start_ms);
|
||||
}
|
||||
DBGPRINTF("omotlp/http: omotlp_http_client_post completed, iRet=%d", iRet);
|
||||
DBGPRINTF("omotel/http: omotel_http_client_post completed, iRet=%d", iRet);
|
||||
RETiRet;
|
||||
}
|
||||
@ -1,8 +1,8 @@
|
||||
/**
|
||||
* @file omotlp_http.h
|
||||
* @file omotel_http.h
|
||||
* @brief HTTP client interface for OTLP/HTTP JSON transport
|
||||
*
|
||||
* This header defines the HTTP client API used by the omotlp module to
|
||||
* This header defines the HTTP client API used by the omotel module to
|
||||
* communicate with OpenTelemetry collectors over HTTP/JSON. It provides
|
||||
* functions for client lifecycle management and HTTP POST operations with
|
||||
* retry semantics, TLS, and proxy support.
|
||||
@ -25,8 +25,8 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef OMOTLP_HTTP_H
|
||||
#define OMOTLP_HTTP_H
|
||||
#ifndef OMOTEL_HTTP_H
|
||||
#define OMOTEL_HTTP_H
|
||||
|
||||
#include <stddef.h>
|
||||
#include <stdint.h>
|
||||
@ -39,7 +39,7 @@
|
||||
* Contains all configuration parameters needed to create and configure
|
||||
* an HTTP client for OTLP/HTTP JSON transport.
|
||||
*/
|
||||
typedef struct omotlp_http_client_config_s {
|
||||
typedef struct omotel_http_client_config_s {
|
||||
const char *url; /**< Target URL for HTTP POST requests */
|
||||
long timeout_ms; /**< Request timeout in milliseconds */
|
||||
const char *user_agent; /**< User-Agent header value */
|
||||
@ -60,10 +60,10 @@ typedef struct omotlp_http_client_config_s {
|
||||
const char *proxy_url; /**< Proxy URL (http://, https://, socks4://, or socks5://) */
|
||||
const char *proxy_user; /**< Proxy username for authentication */
|
||||
const char *proxy_password; /**< Proxy password for authentication */
|
||||
} omotlp_http_client_config_t;
|
||||
} omotel_http_client_config_t;
|
||||
|
||||
/** @brief Opaque HTTP client handle */
|
||||
typedef struct omotlp_http_client_s omotlp_http_client_t;
|
||||
typedef struct omotel_http_client_s omotel_http_client_t;
|
||||
|
||||
/**
|
||||
* @brief Initialize the HTTP client library
|
||||
@ -74,7 +74,7 @@ typedef struct omotlp_http_client_s omotlp_http_client_t;
|
||||
*
|
||||
* @return RS_RET_OK on success, RS_RET_INTERNAL_ERROR on failure
|
||||
*/
|
||||
rsRetVal omotlp_http_global_init(void);
|
||||
rsRetVal omotel_http_global_init(void);
|
||||
|
||||
/**
|
||||
* @brief Cleanup the HTTP client library
|
||||
@ -82,13 +82,13 @@ rsRetVal omotlp_http_global_init(void);
|
||||
* This function should be called during module shutdown to cleanup
|
||||
* the underlying libcurl library. It is safe to call multiple times.
|
||||
*/
|
||||
void omotlp_http_global_cleanup(void);
|
||||
void omotel_http_global_cleanup(void);
|
||||
|
||||
/**
|
||||
* @brief Create a new HTTP client instance
|
||||
*
|
||||
* Creates and configures an HTTP client with the provided configuration.
|
||||
* The client handle must be destroyed using omotlp_http_client_destroy()
|
||||
* The client handle must be destroyed using omotel_http_client_destroy()
|
||||
* when no longer needed.
|
||||
*
|
||||
* @param[in] config Configuration parameters for the HTTP client
|
||||
@ -97,7 +97,7 @@ void omotlp_http_global_cleanup(void);
|
||||
* RS_RET_OUT_OF_MEMORY on allocation failure, RS_RET_INTERNAL_ERROR
|
||||
* on configuration failure
|
||||
*/
|
||||
rsRetVal omotlp_http_client_create(const omotlp_http_client_config_t *config, omotlp_http_client_t **out_client);
|
||||
rsRetVal omotel_http_client_create(const omotel_http_client_config_t *config, omotel_http_client_t **out_client);
|
||||
|
||||
/**
|
||||
* @brief Destroy an HTTP client instance
|
||||
@ -107,7 +107,7 @@ rsRetVal omotlp_http_client_create(const omotlp_http_client_config_t *config, om
|
||||
*
|
||||
* @param[in,out] client_ptr Pointer to the client handle to destroy
|
||||
*/
|
||||
void omotlp_http_client_destroy(omotlp_http_client_t **client_ptr);
|
||||
void omotel_http_client_destroy(omotel_http_client_t **client_ptr);
|
||||
|
||||
/**
|
||||
* @brief Send an HTTP POST request
|
||||
@ -126,10 +126,10 @@ void omotlp_http_client_destroy(omotlp_http_client_t **client_ptr);
|
||||
* errors (5xx, 408, 429, network errors), RS_RET_INTERNAL_ERROR for
|
||||
* configuration errors, RS_RET_PARAM_ERROR for invalid parameters
|
||||
*/
|
||||
rsRetVal omotlp_http_client_post(omotlp_http_client_t *client,
|
||||
rsRetVal omotel_http_client_post(omotel_http_client_t *client,
|
||||
const uint8_t *payload,
|
||||
size_t payload_len,
|
||||
long *out_status_code,
|
||||
long *out_latency_ms);
|
||||
|
||||
#endif /* OMOTLP_HTTP_H */
|
||||
#endif /* OMOTEL_HTTP_H */
|
||||
@ -209,9 +209,9 @@ finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
rsRetVal omotlp_json_build_export(const omotlp_log_record_t *records,
|
||||
rsRetVal omotel_json_build_export(const omotel_log_record_t *records,
|
||||
size_t record_count,
|
||||
const omotlp_resource_attrs_t *resource_attrs,
|
||||
const omotel_resource_attrs_t *resource_attrs,
|
||||
const attribute_map_t *attribute_map,
|
||||
char **out_payload) {
|
||||
struct json_object *root = NULL;
|
||||
@ -267,7 +267,7 @@ rsRetVal omotlp_json_build_export(const omotlp_log_record_t *records,
|
||||
fjson_object_object_add(resource, "attributes", resource_attributes);
|
||||
|
||||
CHKiRet(add_string_attribute(resource_attributes, "service.name", "rsyslog"));
|
||||
CHKiRet(add_string_attribute(resource_attributes, "telemetry.sdk.name", "rsyslog-omotlp"));
|
||||
CHKiRet(add_string_attribute(resource_attributes, "telemetry.sdk.name", "rsyslog-omotel"));
|
||||
CHKiRet(add_string_attribute(resource_attributes, "telemetry.sdk.language", "C"));
|
||||
CHKiRet(add_string_attribute(resource_attributes, "telemetry.sdk.version", VERSION));
|
||||
|
||||
@ -386,7 +386,7 @@ rsRetVal omotlp_json_build_export(const omotlp_log_record_t *records,
|
||||
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
||||
}
|
||||
fjson_object_object_add(scope_entry, "scope", scope);
|
||||
fjson_object_object_add(scope, "name", fjson_object_new_string("rsyslog.omotlp"));
|
||||
fjson_object_object_add(scope, "name", fjson_object_new_string("rsyslog.omotel"));
|
||||
fjson_object_object_add(scope, "version", fjson_object_new_string(VERSION));
|
||||
|
||||
log_records = fjson_object_new_array();
|
||||
@ -396,7 +396,7 @@ rsRetVal omotlp_json_build_export(const omotlp_log_record_t *records,
|
||||
fjson_object_object_add(scope_entry, "logRecords", log_records);
|
||||
|
||||
for (i = 0; i < record_count; ++i) {
|
||||
const omotlp_log_record_t *record = &records[i];
|
||||
const omotel_log_record_t *record = &records[i];
|
||||
struct json_object *log_record = NULL;
|
||||
struct json_object *attributes = NULL;
|
||||
|
||||
@ -25,8 +25,8 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef OMOTLP_OTLP_JSON_H
|
||||
#define OMOTLP_OTLP_JSON_H
|
||||
#ifndef OMOTEL_OTLP_JSON_H
|
||||
#define OMOTEL_OTLP_JSON_H
|
||||
|
||||
#include <stddef.h>
|
||||
#include <stdint.h>
|
||||
@ -40,7 +40,7 @@
|
||||
* Contains all fields for a single OpenTelemetry log record. String fields
|
||||
* are pointers to external memory and are not freed by this module.
|
||||
*/
|
||||
typedef struct omotlp_log_record_s {
|
||||
typedef struct omotel_log_record_s {
|
||||
uint64_t time_unix_nano; /**< Log timestamp in Unix nanoseconds */
|
||||
uint64_t observed_time_unix_nano; /**< Observation timestamp in Unix nanoseconds */
|
||||
uint32_t severity_number; /**< OTLP severity number (0-24) */
|
||||
@ -54,7 +54,7 @@ typedef struct omotlp_log_record_s {
|
||||
const char *span_id; /**< OpenTelemetry span ID (16 hex chars) */
|
||||
uint8_t trace_flags; /**< OpenTelemetry trace flags */
|
||||
uint16_t facility; /**< Syslog facility number */
|
||||
} omotlp_log_record_t;
|
||||
} omotel_log_record_t;
|
||||
|
||||
/**
|
||||
* @brief OTLP resource attributes structure
|
||||
@ -62,11 +62,11 @@ typedef struct omotlp_log_record_s {
|
||||
* Contains resource-level attributes that are applied to all log records
|
||||
* in a batch. Custom attributes are provided as a parsed JSON object.
|
||||
*/
|
||||
typedef struct omotlp_resource_attrs_s {
|
||||
typedef struct omotel_resource_attrs_s {
|
||||
const char *service_instance_id; /**< Service instance identifier */
|
||||
const char *deployment_environment; /**< Deployment environment name */
|
||||
struct json_object *custom_attributes; /**< Parsed JSON object with custom resource attributes */
|
||||
} omotlp_resource_attrs_t;
|
||||
} omotel_resource_attrs_t;
|
||||
|
||||
/* Forward declaration for attribute_map_t */
|
||||
typedef struct attribute_map_s attribute_map_t;
|
||||
@ -92,10 +92,10 @@ typedef struct attribute_map_s attribute_map_t;
|
||||
* @return RS_RET_OK on success, RS_RET_PARAM_ERROR for invalid parameters,
|
||||
* RS_RET_OUT_OF_MEMORY on allocation failure
|
||||
*/
|
||||
rsRetVal omotlp_json_build_export(const omotlp_log_record_t *records,
|
||||
rsRetVal omotel_json_build_export(const omotel_log_record_t *records,
|
||||
size_t record_count,
|
||||
const omotlp_resource_attrs_t *resource_attrs,
|
||||
const omotel_resource_attrs_t *resource_attrs,
|
||||
const attribute_map_t *attribute_map,
|
||||
char **out_payload);
|
||||
|
||||
#endif /* OMOTLP_OTLP_JSON_H */
|
||||
#endif /* OMOTEL_OTLP_JSON_H */
|
||||
@ -1057,15 +1057,15 @@ TESTS += \
|
||||
endif
|
||||
endif
|
||||
|
||||
if ENABLE_OMOTLP
|
||||
if ENABLE_OMOTEL
|
||||
TESTS += \
|
||||
omotlp-http-batch.sh \
|
||||
omotlp-basic.sh \
|
||||
omotlp-batch.sh \
|
||||
omotlp-compression.sh \
|
||||
omotlp-proxy.sh \
|
||||
omotlp-tls.sh \
|
||||
omotlp-trace-correlation.sh
|
||||
omotel-http-batch.sh \
|
||||
omotel-basic.sh \
|
||||
omotel-batch.sh \
|
||||
omotel-compression.sh \
|
||||
omotel-proxy.sh \
|
||||
omotel-tls.sh \
|
||||
omotel-trace-correlation.sh
|
||||
endif
|
||||
|
||||
if ENABLE_OMKAFKA
|
||||
|
||||
@ -1,14 +1,14 @@
|
||||
#!/bin/bash
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
## omotlp-attribute-mapping.sh -- test custom attribute mapping for omotlp module
|
||||
## omotel-attribute-mapping.sh -- test custom attribute mapping for omotel module
|
||||
##
|
||||
## Tests that custom attribute mappings (attributeMap) correctly remap
|
||||
## rsyslog properties to custom OTLP attribute names.
|
||||
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
|
||||
# Check if omotlp module is available
|
||||
require_plugin omotlp
|
||||
# Check if omotel module is available
|
||||
require_plugin omotel
|
||||
|
||||
export NUMMESSAGES=10
|
||||
export EXTRA_EXIT=otel
|
||||
@ -49,12 +49,12 @@ generate_conf
|
||||
add_conf '
|
||||
template(name="otlpBody" type="string" string="%msg%")
|
||||
|
||||
module(load="../plugins/omotlp/.libs/omotlp")
|
||||
module(load="../plugins/omotel/.libs/omotel")
|
||||
|
||||
# Export with custom attribute mapping
|
||||
action(
|
||||
name="omotlp-http"
|
||||
type="omotlp"
|
||||
name="omotel-http"
|
||||
type="omotel"
|
||||
template="otlpBody"
|
||||
endpoint="http://127.0.0.1:'$otel_port'"
|
||||
path="/v1/logs"
|
||||
@ -103,14 +103,14 @@ try:
|
||||
if "logRecords" in scope_log:
|
||||
records.extend(scope_log["logRecords"])
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"omotlp-attribute-mapping: failed to parse OTLP output: {exc}\n")
|
||||
sys.stderr.write(f"omotel-attribute-mapping: failed to parse OTLP output: {exc}\n")
|
||||
sys.exit(1)
|
||||
|
||||
if not records:
|
||||
sys.stderr.write("omotlp-attribute-mapping: OTLP output did not contain any logRecords\n")
|
||||
sys.stderr.write("omotel-attribute-mapping: OTLP output did not contain any logRecords\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-attribute-mapping: found {len(records)} log records in OTLP output\n")
|
||||
sys.stdout.write(f"omotel-attribute-mapping: found {len(records)} log records in OTLP output\n")
|
||||
|
||||
# Filter records to only test messages (contain "test message" in body)
|
||||
test_records = []
|
||||
@ -121,10 +121,10 @@ for record in records:
|
||||
test_records.append(record)
|
||||
|
||||
if len(test_records) < 10:
|
||||
sys.stderr.write(f"omotlp-attribute-mapping: expected at least 10 test records, found {len(test_records)}\n")
|
||||
sys.stderr.write(f"omotel-attribute-mapping: expected at least 10 test records, found {len(test_records)}\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-attribute-mapping: filtered to {len(test_records)} test records\n")
|
||||
sys.stdout.write(f"omotel-attribute-mapping: filtered to {len(test_records)} test records\n")
|
||||
|
||||
# Expected test data (from input file)
|
||||
expected_test_data = [
|
||||
@ -237,15 +237,15 @@ for idx, record in enumerate(test_records[:10]): # Only check first 10 test rec
|
||||
if facility_int != expected_facility:
|
||||
errors.append(f"record {idx}: log.syslog.facility.code mismatch: expected {expected_facility}, got {facility_int}")
|
||||
|
||||
sys.stdout.write(f"omotlp-attribute-mapping: record {idx}: verified {len(attr_dict)} attributes\n")
|
||||
sys.stdout.write(f"omotel-attribute-mapping: record {idx}: verified {len(attr_dict)} attributes\n")
|
||||
|
||||
if errors:
|
||||
sys.stderr.write("omotlp-attribute-mapping: attribute mapping verification errors:\n")
|
||||
sys.stderr.write("omotel-attribute-mapping: attribute mapping verification errors:\n")
|
||||
for error in errors:
|
||||
sys.stderr.write(f" - {error}\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-attribute-mapping: successfully verified custom attribute mappings for {len(test_records[:10])} test records\n")
|
||||
sys.stdout.write(f"omotel-attribute-mapping: successfully verified custom attribute mappings for {len(test_records[:10])} test records\n")
|
||||
PY
|
||||
|
||||
exit_test
|
||||
@ -1,17 +1,17 @@
|
||||
#!/bin/bash
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
## omotlp-basic.sh -- basic functionality test for omotlp module
|
||||
## omotel-basic.sh -- basic functionality test for omotel module
|
||||
##
|
||||
## Starts OTEL Collector, sends messages via omotlp, and verifies
|
||||
## Starts OTEL Collector, sends messages via omotel, and verifies
|
||||
## messages are received and stored correctly in OTLP JSON format.
|
||||
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
|
||||
# Check if omotlp module is available
|
||||
# NOTE: The module must be enabled during configure with --enable-omotlp
|
||||
# Check if omotel module is available
|
||||
# NOTE: The module must be enabled during configure with --enable-omotel
|
||||
# If the module fails to load with error -1001 (RS_RET_MISSING_INTERFACE),
|
||||
# the module was not built correctly and needs to be rebuilt with --enable-omotlp
|
||||
require_plugin omotlp
|
||||
# the module was not built correctly and needs to be rebuilt with --enable-omotel
|
||||
require_plugin omotel
|
||||
|
||||
export NUMMESSAGES=1000
|
||||
export EXTRA_EXIT=otel
|
||||
@ -38,12 +38,12 @@ generate_conf
|
||||
add_conf '
|
||||
template(name="otlpBody" type="string" string="msgnum:%msg:F,58:2%")
|
||||
|
||||
module(load="../plugins/omotlp/.libs/omotlp")
|
||||
module(load="../plugins/omotel/.libs/omotel")
|
||||
|
||||
if $msg contains "msgnum:" then
|
||||
action(
|
||||
name="omotlp-http"
|
||||
type="omotlp"
|
||||
name="omotel-http"
|
||||
type="omotel"
|
||||
template="otlpBody"
|
||||
endpoint="http://127.0.0.1:'$otel_port'"
|
||||
path="/v1/logs"
|
||||
@ -92,11 +92,11 @@ try:
|
||||
if "logRecords" in scope_log:
|
||||
records.extend(scope_log["logRecords"])
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"omotlp-basic: failed to parse OTLP output: {exc}\n")
|
||||
sys.stderr.write(f"omotel-basic: failed to parse OTLP output: {exc}\n")
|
||||
sys.exit(1)
|
||||
|
||||
if not records:
|
||||
sys.stderr.write("omotlp-basic: OTLP output did not contain any logRecords\n")
|
||||
sys.stderr.write("omotel-basic: OTLP output did not contain any logRecords\n")
|
||||
sys.exit(1)
|
||||
|
||||
def has_hostname(attrs):
|
||||
@ -109,10 +109,10 @@ def has_hostname(attrs):
|
||||
|
||||
for idx, record in enumerate(records):
|
||||
if record.get("severityNumber", 0) == 0:
|
||||
sys.stderr.write(f"omotlp-basic: record {idx} missing severityNumber\n")
|
||||
sys.stderr.write(f"omotel-basic: record {idx} missing severityNumber\n")
|
||||
sys.exit(1)
|
||||
if not has_hostname(record.get("attributes", [])):
|
||||
sys.stderr.write(f"omotlp-basic: record {idx} missing log.syslog.hostname attribute\n")
|
||||
sys.stderr.write(f"omotel-basic: record {idx} missing log.syslog.hostname attribute\n")
|
||||
sys.exit(1)
|
||||
PY
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
#!/bin/bash
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
## omotlp-batch.sh -- batching test for omotlp module
|
||||
## omotel-batch.sh -- batching test for omotel module
|
||||
##
|
||||
## Tests that omotlp correctly batches multiple log records into
|
||||
## Tests that omotel correctly batches multiple log records into
|
||||
## single ExportLogsServiceRequest payloads.
|
||||
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
@ -31,12 +31,12 @@ generate_conf
|
||||
add_conf '
|
||||
template(name="otlpBody" type="string" string="msgnum:%msg:F,58:2%")
|
||||
|
||||
module(load="../plugins/omotlp/.libs/omotlp")
|
||||
module(load="../plugins/omotel/.libs/omotel")
|
||||
|
||||
if $msg contains "msgnum:" then
|
||||
action(
|
||||
name="omotlp-http"
|
||||
type="omotlp"
|
||||
name="omotel-http"
|
||||
type="omotel"
|
||||
template="otlpBody"
|
||||
endpoint="http://127.0.0.1:'$otel_port'"
|
||||
path="/v1/logs"
|
||||
@ -1,8 +1,8 @@
|
||||
#!/bin/bash
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
## omotlp-compression.sh -- compression and resource config test for omotlp module
|
||||
## omotel-compression.sh -- compression and resource config test for omotel module
|
||||
##
|
||||
## Tests that omotlp correctly sends gzip-compressed payloads
|
||||
## Tests that omotel correctly sends gzip-compressed payloads
|
||||
## and that OTEL Collector can decode them. Also tests resource parameter
|
||||
## configuration with string, integer, double, and boolean attributes.
|
||||
|
||||
@ -32,12 +32,12 @@ generate_conf
|
||||
add_conf '
|
||||
template(name="otlpBody" type="string" string="msgnum:%msg:F,58:2%")
|
||||
|
||||
module(load="../plugins/omotlp/.libs/omotlp")
|
||||
module(load="../plugins/omotel/.libs/omotel")
|
||||
|
||||
if $msg contains "msgnum:" then
|
||||
action(
|
||||
name="omotlp-http"
|
||||
type="omotlp"
|
||||
name="omotel-http"
|
||||
type="omotel"
|
||||
template="otlpBody"
|
||||
endpoint="http://127.0.0.1:'$otel_port'"
|
||||
path="/v1/logs"
|
||||
@ -85,14 +85,14 @@ try:
|
||||
if "resourceLogs" in payload:
|
||||
payloads.append(payload)
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"omotlp-compression: failed to parse OTLP output: {exc}\n")
|
||||
sys.stderr.write(f"omotel-compression: failed to parse OTLP output: {exc}\n")
|
||||
sys.exit(1)
|
||||
|
||||
if not payloads:
|
||||
sys.stderr.write("omotlp-compression: OTLP output did not contain any resourceLogs\n")
|
||||
sys.stderr.write("omotel-compression: OTLP output did not contain any resourceLogs\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-compression: found {len(payloads)} payload(s)\n")
|
||||
sys.stdout.write(f"omotel-compression: found {len(payloads)} payload(s)\n")
|
||||
|
||||
# Extract resource attributes from the first payload
|
||||
resource_attrs = {}
|
||||
@ -104,7 +104,7 @@ for payload in payloads:
|
||||
if "resource" in resource_log and "attributes" in resource_log["resource"]:
|
||||
found_resource = True
|
||||
attrs = resource_log["resource"]["attributes"]
|
||||
sys.stdout.write(f"omotlp-compression: found {len(attrs)} resource attributes\n")
|
||||
sys.stdout.write(f"omotel-compression: found {len(attrs)} resource attributes\n")
|
||||
|
||||
# Parse attributes into a dictionary
|
||||
for attr_entry in attrs:
|
||||
@ -129,16 +129,16 @@ for payload in payloads:
|
||||
break
|
||||
|
||||
if not found_resource:
|
||||
sys.stderr.write("omotlp-compression: no resource attributes found in OTLP output\n")
|
||||
sys.stderr.write("omotel-compression: no resource attributes found in OTLP output\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-compression: extracted {len(resource_attrs)} resource attributes\n")
|
||||
sys.stdout.write(f"omotel-compression: extracted {len(resource_attrs)} resource attributes\n")
|
||||
|
||||
# Expected attributes (automatic + custom)
|
||||
expected_attrs = {
|
||||
# Automatic attributes (always present)
|
||||
"service.name": ("string", "test-service"), # Overridden by custom config
|
||||
"telemetry.sdk.name": ("string", "rsyslog-omotlp"),
|
||||
"telemetry.sdk.name": ("string", "rsyslog-omotel"),
|
||||
"telemetry.sdk.language": ("string", "C"),
|
||||
|
||||
# Custom attributes from resource parameter
|
||||
@ -168,7 +168,7 @@ for key, (expected_type, expected_value) in expected_attrs.items():
|
||||
errors.append(f"attribute {key}: value mismatch (expected {expected_value}, got {actual_value})")
|
||||
continue
|
||||
|
||||
sys.stdout.write(f"omotlp-compression: verified {key} = {actual_value} ({actual_type})\n")
|
||||
sys.stdout.write(f"omotel-compression: verified {key} = {actual_value} ({actual_type})\n")
|
||||
|
||||
# Check for telemetry.sdk.version (automatic, but version may vary)
|
||||
if "telemetry.sdk.version" not in resource_attrs:
|
||||
@ -178,15 +178,15 @@ else:
|
||||
if sdk_type != "string" or not sdk_version:
|
||||
errors.append(f"telemetry.sdk.version has invalid value: {sdk_version}")
|
||||
else:
|
||||
sys.stdout.write(f"omotlp-compression: verified telemetry.sdk.version = {sdk_version}\n")
|
||||
sys.stdout.write(f"omotel-compression: verified telemetry.sdk.version = {sdk_version}\n")
|
||||
|
||||
if errors:
|
||||
sys.stderr.write("omotlp-compression: resource attribute verification errors:\n")
|
||||
sys.stderr.write("omotel-compression: resource attribute verification errors:\n")
|
||||
for error in errors:
|
||||
sys.stderr.write(f" - {error}\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-compression: successfully verified all resource attributes\n")
|
||||
sys.stdout.write(f"omotel-compression: successfully verified all resource attributes\n")
|
||||
PY
|
||||
|
||||
seq_check
|
||||
@ -1,27 +1,27 @@
|
||||
#!/bin/bash
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
## omotlp-http-batch.sh -- smoke-test OTLP batching and retries
|
||||
## omotel-http-batch.sh -- smoke-test OTLP batching and retries
|
||||
##
|
||||
## Starts the omhttp test server, emits four messages through omotlp with
|
||||
## Starts the omhttp test server, emits four messages through omotel with
|
||||
## batching and gzip enabled, and verifies the collector captured two payloads
|
||||
## with the expected retry behaviour and record order.
|
||||
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
|
||||
# Check if omotlp module is available
|
||||
require_plugin omotlp
|
||||
# Check if omotel module is available
|
||||
require_plugin omotel
|
||||
|
||||
omhttp_start_server 0 --decompress --fail-every 2 --fail-with 503
|
||||
|
||||
generate_conf
|
||||
add_conf '
|
||||
module(load="../plugins/omotlp/.libs/omotlp")
|
||||
module(load="../plugins/omotel/.libs/omotel")
|
||||
template(name="otlpBody" type="string" string="%msg%")
|
||||
|
||||
# Process all messages through omotlp
|
||||
# Process all messages through omotel
|
||||
action(
|
||||
name="omotlp-http"
|
||||
type="omotlp"
|
||||
name="omotel-http"
|
||||
type="omotel"
|
||||
template="otlpBody"
|
||||
endpoint="http://127.0.0.1:'$omhttp_server_lstnport'"
|
||||
path="/v1/logs"
|
||||
@ -31,7 +31,7 @@ action(
|
||||
retry.initial.ms="10"
|
||||
retry.max.ms="100"
|
||||
retry.max_retries="3"
|
||||
headers='{ "X-Test-Header": "omotlp" }'
|
||||
headers='{ "X-Test-Header": "omotel" }'
|
||||
)
|
||||
'
|
||||
|
||||
@ -1,14 +1,14 @@
|
||||
#!/bin/bash
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
## omotlp-proxy.sh -- proxy support test for omotlp module
|
||||
## omotel-proxy.sh -- proxy support test for omotel module
|
||||
##
|
||||
## Tests that omotlp correctly uses HTTP proxy to forward requests to
|
||||
## Tests that omotel correctly uses HTTP proxy to forward requests to
|
||||
## OTEL Collector, with and without proxy authentication.
|
||||
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
|
||||
# Check if omotlp module is available
|
||||
require_plugin omotlp
|
||||
# Check if omotel module is available
|
||||
require_plugin omotel
|
||||
|
||||
export NUMMESSAGES=100
|
||||
export EXTRA_EXIT="otel proxy"
|
||||
@ -38,7 +38,7 @@ fi
|
||||
echo "Using OTEL Collector port: $otel_port"
|
||||
|
||||
# Start proxy server (no authentication)
|
||||
proxy_server_py="$srcdir/omotlp_proxy_server.py"
|
||||
proxy_server_py="$srcdir/omotel_proxy_server.py"
|
||||
if [ ! -f "$proxy_server_py" ]; then
|
||||
echo "ERROR: Proxy server script not found: $proxy_server_py"
|
||||
error_exit 1
|
||||
@ -101,12 +101,12 @@ generate_conf
|
||||
add_conf '
|
||||
template(name="otlpBody" type="string" string="msgnum:%msg:F,58:2%")
|
||||
|
||||
module(load="../plugins/omotlp/.libs/omotlp")
|
||||
module(load="../plugins/omotel/.libs/omotel")
|
||||
|
||||
if $msg contains "msgnum:" then
|
||||
action(
|
||||
name="omotlp-proxy"
|
||||
type="omotlp"
|
||||
name="omotel-proxy"
|
||||
type="omotel"
|
||||
template="otlpBody"
|
||||
endpoint="http://127.0.0.1:'$otel_port'"
|
||||
path="/v1/logs"
|
||||
@ -178,14 +178,14 @@ try:
|
||||
if "logRecords" in scope_log:
|
||||
records.extend(scope_log["logRecords"])
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"omotlp-proxy: failed to parse OTLP output: {exc}\n")
|
||||
sys.stderr.write(f"omotel-proxy: failed to parse OTLP output: {exc}\n")
|
||||
sys.exit(1)
|
||||
|
||||
if not records:
|
||||
sys.stderr.write("omotlp-proxy: OTLP output did not contain any logRecords\n")
|
||||
sys.stderr.write("omotel-proxy: OTLP output did not contain any logRecords\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-proxy: successfully received {len(records)} log records via proxy\n")
|
||||
sys.stdout.write(f"omotel-proxy: successfully received {len(records)} log records via proxy\n")
|
||||
PY
|
||||
|
||||
seq_check
|
||||
@ -247,12 +247,12 @@ generate_conf
|
||||
add_conf '
|
||||
template(name="otlpBody" type="string" string="msgnum:%msg:F,58:2%")
|
||||
|
||||
module(load="../plugins/omotlp/.libs/omotlp")
|
||||
module(load="../plugins/omotel/.libs/omotel")
|
||||
|
||||
if $msg contains "msgnum:" then
|
||||
action(
|
||||
name="omotlp-proxy-auth"
|
||||
type="omotlp"
|
||||
name="omotel-proxy-auth"
|
||||
type="omotel"
|
||||
template="otlpBody"
|
||||
endpoint="http://127.0.0.1:'$otel_port2'"
|
||||
path="/v1/logs"
|
||||
@ -1,15 +1,15 @@
|
||||
#!/bin/bash
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
## omotlp-tls.sh -- TLS/mTLS support test for omotlp module
|
||||
## omotel-tls.sh -- TLS/mTLS support test for omotel module
|
||||
##
|
||||
## Tests that omotlp correctly establishes HTTPS connections with TLS
|
||||
## Tests that omotel correctly establishes HTTPS connections with TLS
|
||||
## certificate verification, and optionally with mutual TLS (mTLS) using
|
||||
## client certificates.
|
||||
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
|
||||
# Check if omotlp module is available
|
||||
require_plugin omotlp
|
||||
# Check if omotel module is available
|
||||
require_plugin omotel
|
||||
|
||||
export NUMMESSAGES=500
|
||||
export EXTRA_EXIT=otel
|
||||
@ -114,12 +114,12 @@ generate_conf
|
||||
add_conf '
|
||||
template(name="otlpBody" type="string" string="msgnum:%msg:F,58:2%")
|
||||
|
||||
module(load="../plugins/omotlp/.libs/omotlp")
|
||||
module(load="../plugins/omotel/.libs/omotel")
|
||||
|
||||
if $msg contains "msgnum:" then
|
||||
action(
|
||||
name="omotlp-https"
|
||||
type="omotlp"
|
||||
name="omotel-https"
|
||||
type="omotel"
|
||||
template="otlpBody"
|
||||
endpoint="https://127.0.0.1:'$otel_port'"
|
||||
path="/v1/logs"
|
||||
@ -172,14 +172,14 @@ try:
|
||||
if "logRecords" in scope_log:
|
||||
records.extend(scope_log["logRecords"])
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"omotlp-tls: failed to parse OTLP output: {exc}\n")
|
||||
sys.stderr.write(f"omotel-tls: failed to parse OTLP output: {exc}\n")
|
||||
sys.exit(1)
|
||||
|
||||
if not records:
|
||||
sys.stderr.write("omotlp-tls: OTLP output did not contain any logRecords\n")
|
||||
sys.stderr.write("omotel-tls: OTLP output did not contain any logRecords\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-tls: successfully received {len(records)} log records via HTTPS/TLS\n")
|
||||
sys.stdout.write(f"omotel-tls: successfully received {len(records)} log records via HTTPS/TLS\n")
|
||||
PY
|
||||
|
||||
seq_check
|
||||
@ -1,6 +1,6 @@
|
||||
#!/bin/bash
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
## omotlp-trace-correlation.sh -- test trace correlation support for omotlp module
|
||||
## omotel-trace-correlation.sh -- test trace correlation support for omotel module
|
||||
##
|
||||
## Tests that trace_id, span_id, and trace_flags are extracted from message
|
||||
## properties and included in the OTLP export. Uses mmjsonparse to extract
|
||||
@ -8,8 +8,8 @@
|
||||
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
|
||||
# Check if omotlp module is available
|
||||
require_plugin omotlp
|
||||
# Check if omotel module is available
|
||||
require_plugin omotel
|
||||
require_plugin mmjsonparse
|
||||
|
||||
export NUMMESSAGES=10
|
||||
@ -52,15 +52,15 @@ add_conf '
|
||||
template(name="otlpBody" type="string" string="%msg%")
|
||||
|
||||
module(load="../plugins/mmjsonparse/.libs/mmjsonparse")
|
||||
module(load="../plugins/omotlp/.libs/omotlp")
|
||||
module(load="../plugins/omotel/.libs/omotel")
|
||||
|
||||
# Parse JSON to extract trace properties
|
||||
action(type="mmjsonparse" mode="find-json")
|
||||
|
||||
# Export with trace correlation
|
||||
action(
|
||||
name="omotlp-http"
|
||||
type="omotlp"
|
||||
name="omotel-http"
|
||||
type="omotel"
|
||||
template="otlpBody"
|
||||
endpoint="http://127.0.0.1:'$otel_port'"
|
||||
path="/v1/logs"
|
||||
@ -111,21 +111,21 @@ try:
|
||||
if "logRecords" in scope_log:
|
||||
records.extend(scope_log["logRecords"])
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"omotlp-trace-correlation: failed to parse OTLP output: {exc}\n")
|
||||
sys.stderr.write(f"omotel-trace-correlation: failed to parse OTLP output: {exc}\n")
|
||||
sys.exit(1)
|
||||
|
||||
if not records:
|
||||
sys.stderr.write("omotlp-trace-correlation: OTLP output did not contain any logRecords\n")
|
||||
sys.stderr.write("omotel-trace-correlation: OTLP output did not contain any logRecords\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-trace-correlation: found {len(records)} log records in OTLP output\n")
|
||||
sys.stdout.write(f"omotel-trace-correlation: found {len(records)} log records in OTLP output\n")
|
||||
|
||||
# Expected trace values
|
||||
expected_trace_id = "4bf92f3577b34da6a3ce929d0e0e4736"
|
||||
expected_span_id = "00f067aa0ba902b7"
|
||||
expected_trace_flags = 1
|
||||
|
||||
sys.stdout.write(f"omotlp-trace-correlation: expected traceId='{expected_trace_id}', spanId='{expected_span_id}', flags={expected_trace_flags}\n")
|
||||
sys.stdout.write(f"omotel-trace-correlation: expected traceId='{expected_trace_id}', spanId='{expected_span_id}', flags={expected_trace_flags}\n")
|
||||
|
||||
# Verify trace correlation fields
|
||||
for idx, record in enumerate(records):
|
||||
@ -133,30 +133,30 @@ for idx, record in enumerate(records):
|
||||
span_id = record.get("spanId")
|
||||
trace_flags = record.get("flags")
|
||||
|
||||
sys.stdout.write(f"omotlp-trace-correlation: record {idx}: traceId={trace_id}, spanId={span_id}, flags={trace_flags}\n")
|
||||
sys.stdout.write(f"omotel-trace-correlation: record {idx}: traceId={trace_id}, spanId={span_id}, flags={trace_flags}\n")
|
||||
|
||||
if trace_id is None:
|
||||
sys.stderr.write(f"omotlp-trace-correlation: record {idx} missing traceId\n")
|
||||
sys.stderr.write(f"omotel-trace-correlation: record {idx} missing traceId\n")
|
||||
sys.exit(1)
|
||||
if trace_id != expected_trace_id:
|
||||
sys.stderr.write(f"omotlp-trace-correlation: record {idx} traceId mismatch: expected '{expected_trace_id}', got '{trace_id}'\n")
|
||||
sys.stderr.write(f"omotel-trace-correlation: record {idx} traceId mismatch: expected '{expected_trace_id}', got '{trace_id}'\n")
|
||||
sys.exit(1)
|
||||
|
||||
if span_id is None:
|
||||
sys.stderr.write(f"omotlp-trace-correlation: record {idx} missing spanId\n")
|
||||
sys.stderr.write(f"omotel-trace-correlation: record {idx} missing spanId\n")
|
||||
sys.exit(1)
|
||||
if span_id != expected_span_id:
|
||||
sys.stderr.write(f"omotlp-trace-correlation: record {idx} spanId mismatch: expected '{expected_span_id}', got '{span_id}'\n")
|
||||
sys.stderr.write(f"omotel-trace-correlation: record {idx} spanId mismatch: expected '{expected_span_id}', got '{span_id}'\n")
|
||||
sys.exit(1)
|
||||
|
||||
if trace_flags is None:
|
||||
sys.stderr.write(f"omotlp-trace-correlation: record {idx} missing flags\n")
|
||||
sys.stderr.write(f"omotel-trace-correlation: record {idx} missing flags\n")
|
||||
sys.exit(1)
|
||||
if trace_flags != expected_trace_flags:
|
||||
sys.stderr.write(f"omotlp-trace-correlation: record {idx} flags mismatch: expected {expected_trace_flags}, got {trace_flags}\n")
|
||||
sys.stderr.write(f"omotel-trace-correlation: record {idx} flags mismatch: expected {expected_trace_flags}, got {trace_flags}\n")
|
||||
sys.exit(1)
|
||||
|
||||
sys.stdout.write(f"omotlp-trace-correlation: verified {len(records)} records with trace correlation\n")
|
||||
sys.stdout.write(f"omotel-trace-correlation: verified {len(records)} records with trace correlation\n")
|
||||
PY
|
||||
|
||||
exit_test
|
||||
@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simple HTTP proxy server for omotlp proxy testing.
|
||||
Simple HTTP proxy server for omotel proxy testing.
|
||||
|
||||
This proxy server:
|
||||
- Forwards HTTP requests to a target server
|
||||
@ -206,7 +206,7 @@ class ProxyHandler(BaseHTTPRequestHandler):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Simple HTTP proxy for omotlp testing')
|
||||
parser = argparse.ArgumentParser(description='Simple HTTP proxy for omotel testing')
|
||||
parser.add_argument('-p', '--port', type=int, default=0, help='Listen port (0 = auto)')
|
||||
parser.add_argument('--port-file', type=str, default='', help='File to write listen port')
|
||||
parser.add_argument('--target-host', type=str, default='127.0.0.1', help='Target server host')
|
||||
Loading…
x
Reference in New Issue
Block a user