mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-18 19:10:42 +01:00
omprog: added error handling and transaction support for external plugins
This commit is contained in:
parent
fffda03d47
commit
52000005c5
189
plugins/external/INTERFACE.md
vendored
189
plugins/external/INTERFACE.md
vendored
@ -14,13 +14,13 @@ parameters inside the plugin code.
|
||||
How the plugin receives messages
|
||||
--------------------------------
|
||||
Rsyslog pushes messages via stdin. Each message is terminated by a LF. So
|
||||
a plugin can obtain a full messages simply by reading a line.
|
||||
a plugin can obtain a full message simply by reading a line.
|
||||
|
||||
This can cause problems with multi-line messages. There are some cures for
|
||||
this. The recommended one is to use JSON message format (more on message
|
||||
formats below). This will encode LF as "\n" (by JSON RFC requirements) and
|
||||
thus will ensure there are no embedded LFs even in multiline messages.
|
||||
An alternative is to use a message format which contains some other delimiter
|
||||
thus will ensure there are no embedded LFs even in multiline messages.
|
||||
An alternative is to use a message format which contains some other delimiter
|
||||
and program the plugin to watch for this delimiter. With the near-universal
|
||||
availability of JSON libraries in languages these days, we strongly think that
|
||||
the JSON approach is superior.
|
||||
@ -28,22 +28,170 @@ the JSON approach is superior.
|
||||
The _message format_ is generated by standard rsyslog methods, that is via
|
||||
a template. This gives full flexibility over what the plugin is fed. Unfortunately
|
||||
this also means the necessary template definitions are needed. See the rsyslog
|
||||
doc for how to do that (in the future, this file will also contain some
|
||||
doc for how to do that (in the future, this file will also contain some
|
||||
samples).
|
||||
|
||||
How to provide feedback to rsyslog
|
||||
----------------------------------
|
||||
The plugin may want to convey error information to rsyslog. Unfortunately, the
|
||||
current interface is very weak in this regard. All a plugin currently can do
|
||||
is terminate. In the future, there will be a capability to send feedback via
|
||||
stdout or stderr. For that reason, make sure that _no data is written to stdout
|
||||
or stderr_ by your plugin.
|
||||
Providing Feedback to Rsyslog
|
||||
=============================
|
||||
The plugin may convey error information to rsyslog. To do this, set the
|
||||
`confirmMessages` flag to `on` in the `omprog` action configuration (this flag
|
||||
is disabled by default). When this flag is enabled, rsyslog will wait for a
|
||||
confirmation from the plugin after sending every log message to it.
|
||||
|
||||
Plugins that write data to stdout or stderr are _not_ compliant to the current
|
||||
interface specification and will not continue to work in the future.
|
||||
The plugin must confirm the message by writing a line with the word `OK` to
|
||||
its standard output. That is, the plugin must write the characters `O`, `K` and
|
||||
LF (line feed) to stdout.
|
||||
|
||||
If the plugin writes a line to stdout containing anything else (for example,
|
||||
the string `Error: could not connect to the database` followed by a LF), rsyslog
|
||||
will consider that the plugin has not been able to process the message. The
|
||||
message will be retried later, according to the retry settings configured for
|
||||
the `omprog` action (e.g. the `action.resumeInterval` setting). When debugging
|
||||
is enabled in rsyslog, the line returned by the plugin will be included in the
|
||||
debug logs.
|
||||
|
||||
If the plugin terminates, the message is also considered as non-processed.
|
||||
The plugin will later be restarted, and the message retried, according to the
|
||||
configured retry settings.
|
||||
|
||||
When starting the plugin, if `confirmMessages` is `on`, rsyslog will also wait
|
||||
for the plugin to confirm its initialization. The plugin must write an `OK` line
|
||||
to stdout just after starting. If it writes anything else or terminates, rsyslog
|
||||
will consider the plugin initialization has failed, and will try to restart it later.
|
||||
|
||||
Example of exchanged messages
|
||||
-----------------------------
|
||||
The following sequence illustrates the message exchanges between rsyslog and the
|
||||
plugin. A right arrow (`=>`) indicates a message read by the plugin from its
|
||||
stdin, and a left arrow (`<=`) indicates a message written by the plugin to its
|
||||
stdout. Note that the arrows themselves are not read or written. Each line is
|
||||
terminated by a LF (\n).
|
||||
|
||||
<= OK
|
||||
=> log message 1
|
||||
<= OK
|
||||
=> log message 2
|
||||
<= OK
|
||||
...
|
||||
=> log message N
|
||||
<= OK
|
||||
|
||||
Note that the first `OK` does not confirm any message, but that the plugin has
|
||||
correctly started and is ready to receive messages. When the plugin receives
|
||||
an end-of-file (EOF), it must silently terminate.
|
||||
|
||||
Writing to stderr
|
||||
-----------------
|
||||
Aside from confirming messages via stdout, at any moment the plugin may write
|
||||
anything it wants to stderr. The `output` setting of the `omprog` action allows
|
||||
capturing the plugin's stderr to a file, which can be useful for debugging.
|
||||
Apart from this facility, rsyslog will ignore the plugin's stderr.
|
||||
|
||||
Note: When the `output` setting is specified and `confirmMessages` is set to
|
||||
`off`, rsyslog will capture both the stdout and stderr of the plugin to the
|
||||
specified file. You can use this to debug your plugin if you think it is not confirming
|
||||
the messages as expected.
|
||||
|
||||
Example implementation
|
||||
----------------------
|
||||
See [this Python plugin skeleton](skeletons/python/plugin-with-feedback.py) for
|
||||
a featured example on how a plugin can provide feedback to rsyslog.
|
||||
|
||||
|
||||
Batching of Messages (Transactions)
|
||||
===================================
|
||||
You can write a plugin that processes the messages in batches (also called
|
||||
_transactions_), instead of individually. For a general explanation on how
|
||||
rsyslog handles the batching of messages, see
|
||||
http://www.rsyslog.com/doc/v8-stable/development/dev_oplugins.html.
|
||||
|
||||
How to process the messages in batches (transactions)
|
||||
-----------------------------------------------------
|
||||
To enable transactions, set the `useTransactions` flag to `on` in the `omprog`
|
||||
action configuration. When this flag is enabled, rsyslog will send a special
|
||||
message line to the plugin's stdin to indicate that a batch of log messages is
|
||||
going to be sent. This special message is `BEGIN TRANSACTION` by default, although
|
||||
it can be customized using the `beginTransactionMark` setting of `omprog`.
|
||||
|
||||
After the `BEGIN TRANSACTION` line, rsyslog will send the log messages in the
|
||||
batch, each one in its own line, and then another special message `COMMIT
|
||||
TRANSACTION` to indicate the batch has ended. (The later can be customized via
|
||||
the `commitTransactionMark` setting.)
|
||||
|
||||
That is:
|
||||
|
||||
BEGIN TRANSACTION
|
||||
log message 1
|
||||
log message 2
|
||||
...
|
||||
log message N
|
||||
COMMIT TRANSACTION
|
||||
BEGIN TRANSACTION
|
||||
...
|
||||
COMMIT TRANSACTION
|
||||
BEGIN TRANSACTION
|
||||
...
|
||||
COMMIT TRANSACTION
|
||||
...
|
||||
|
||||
(with a LF at the end of each line)
|
||||
|
||||
When transactions are enabled, rsyslog will always send log messages within a
|
||||
transaction block, never outside it, even when the batch consists of a single message.
|
||||
|
||||
How to provide feedback when using transactions
|
||||
-----------------------------------------------
|
||||
You can enable both the `useTransactions` and `confirmMessages` settings in
|
||||
the `omprog` action, only one of them, or none of them. When both settings
|
||||
are set to `on`, the plugin must confirm the `BEGIN TRANSACTION` and `COMMIT
|
||||
TRANSACTION` messages, and the log messages within the transaction. The log
|
||||
messages within the transaction can be confirmed with any of the following
|
||||
status codes (which the plugin must write to stdout):
|
||||
* `OK`
|
||||
* `DEFER_COMMIT`
|
||||
* `PREVIOUS_COMMITTED`
|
||||
|
||||
Refer to http://www.rsyslog.com/doc/v8-stable/development/dev_oplugins.html
|
||||
for an explanation on the meaning of these status codes. You will typically
|
||||
need to return the `DEFER_COMMIT` status code, since the other codes imply a
|
||||
partial commit, and do not guarantee that the `COMMIT TRANSACTION` will be
|
||||
received.
|
||||
|
||||
The following sequence illustrates the exchanges between rsyslog and the plugin
|
||||
when transactions and message confirmations are enabled, and the plugin
|
||||
confirms the log messages within each transaction with `DEFER_COMMIT`:
|
||||
|
||||
<= OK
|
||||
=> BEGIN TRANSACTION
|
||||
<= OK
|
||||
=> log message 1
|
||||
<= DEFER_COMMIT
|
||||
=> log message 2
|
||||
<= DEFER_COMMIT
|
||||
...
|
||||
=> log message 5
|
||||
<= DEFER_COMMIT
|
||||
=> COMMIT TRANSACTION
|
||||
<= OK
|
||||
=> BEGIN TRANSACTION
|
||||
<= OK
|
||||
=> log message 6
|
||||
<= DEFER_COMMIT
|
||||
=> log message 7
|
||||
<= DEFER_COMMIT
|
||||
...
|
||||
=> log message 10
|
||||
<= DEFER_COMMIT
|
||||
=> COMMIT TRANSACTION
|
||||
<= OK
|
||||
|
||||
Example implementation
|
||||
----------------------
|
||||
For a reference example of a plugin with transaction support, see [this Python
|
||||
plugin skeleton](skeletons/python/plugin-with-feedback.py).
|
||||
|
||||
Threading Model
|
||||
---------------
|
||||
===============
|
||||
Write your plugin as you would do in a single threaded environment. Rsyslog
|
||||
automatically detects when it is time to spawn additional threads. If it
|
||||
decides so, it will also spawn another instance of your script and feed it
|
||||
@ -53,17 +201,12 @@ are no longer needed.
|
||||
If your plugin for some reason cannot be run in multiple instances, there are ways
|
||||
to tell rsyslog to work with a single instance. But it is strongly suggested to
|
||||
not restrict rsyslog to do that. Multiple instances in almost all cases do NOT mean
|
||||
any burden to the plugin developer. Just think of two (or more) independent
|
||||
any burden to the plugin developer. Just think of two (or more) independent
|
||||
instances of your program running in different console windows. If that is no
|
||||
problem, rsyslog running multiple instances of it is also no problem.
|
||||
|
||||
Future Enhancements
|
||||
===================
|
||||
The external output plugin interface will be enhanced within the next future.
|
||||
Most importantly, it will provide support for conveying back status information
|
||||
to rsyslog. Note that all existing plugins will continue to work, even when the
|
||||
interface is evolved. So there is no need to wait for a new interface version.
|
||||
|
||||
Interfaces for external input, filter and message modification plugins are
|
||||
planned. Most probably, they will become available in the order mentioned
|
||||
in the last sentence.
|
||||
@ -116,10 +259,10 @@ Most message properties can be modified. Modifiable are:
|
||||
* all message variable ("$!" tree)
|
||||
|
||||
If the message variable tree is modified, new variables may also be *added*. Deletion
|
||||
of message variables is not directly supported. If this is desired, it is suggested
|
||||
of message variables is not directly supported. If this is desired, it is suggested
|
||||
to set the variable in question to the empty string ("").
|
||||
|
||||
Implementation
|
||||
-------------
|
||||
The plugin interface is implemented via the "mmexternal" native plugin. See it's
|
||||
--------------
|
||||
The plugin interface is implemented via the "mmexternal" native plugin. See its
|
||||
documentation on how to tie your plugin into rsyslog's procesing flow.
|
||||
|
||||
295
plugins/external/skeletons/python/plugin-with-feedback.py
vendored
Executable file
295
plugins/external/skeletons/python/plugin-with-feedback.py
vendored
Executable file
@ -0,0 +1,295 @@
|
||||
#!/usr/bin/env python3
|
||||
"""A skeleton for a Python rsyslog output plugin with error handling
|
||||
and transaction support. Requires Python 3.
|
||||
|
||||
To integrate a plugin based on this skeleton with rsyslog, configure an
|
||||
'omprog' action like the following:
|
||||
action(type="omprog"
|
||||
binary="/usr/bin/myplugin.py"
|
||||
confirmMessages="on"
|
||||
useTransactions="on" # or "off" if you don't need transactions
|
||||
...)
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
-or-
|
||||
see COPYING.ASL20 in the source distribution
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
|
||||
|
||||
# Global definitions specific to your plugin
|
||||
outfile = None
|
||||
|
||||
|
||||
class RecoverableError(Exception):
|
||||
"""An error that has caused the processing of the current message or
|
||||
transaction to fail, but does not require restarting the plugin.
|
||||
|
||||
An example of such an error would be a temporary loss of connection to
|
||||
a database or a server. If such an error occurs in the onBeginTransation,
|
||||
onMessage or onCommitTransaction functions, your plugin should wrap it
|
||||
in a RecoverableError before raising it. For example:
|
||||
|
||||
try:
|
||||
# code that connects to a database
|
||||
except DbConnectionError as e:
|
||||
raise RecoverableError from e
|
||||
|
||||
Recoverable errors will cause the 'omprog' action to be temporarily
|
||||
suspended by rsyslog, during a period that can be configured using the
|
||||
"action.resumeInterval" action parameter. When the action is resumed,
|
||||
rsyslog will resend the failed message or transaction to your plugin.
|
||||
"""
|
||||
|
||||
|
||||
def onInit():
|
||||
"""Do everything that is needed to initialize processing (e.g. open files,
|
||||
create handles, connect to systems...).
|
||||
"""
|
||||
# Apart from processing the logs received from rsyslog, you want your plugin
|
||||
# to be able to report its own logs in some way. This will facilitate
|
||||
# diagnosing problems and debugging your code. Here we set up the standard
|
||||
# Python logging system to output the logs to stderr.
|
||||
logging.basicConfig(stream=sys.stderr,
|
||||
level=logging.WARNING,
|
||||
format='%(asctime)s %(levelname)s %(message)s')
|
||||
|
||||
# In the rsyslog configuration, you can configure the 'omprog' action to
|
||||
# capture the stderr of your plugin by specifying the action's "output"
|
||||
# parameter. However, note that this parameter is intended for debugging
|
||||
# purposes and cannot be used on a permanent basis (in particular, it will
|
||||
# not work well if rsyslog decides to launch multiple instances of your
|
||||
# plugin, since this would cause concurrent writes to the output file).
|
||||
# As an alternative to logging to stderr, you can log to a per-process
|
||||
# file, as shown here:
|
||||
# logging.basicConfig(filename="myplugin-{}.log".format(os.getpid()), ...)
|
||||
|
||||
# This is an example of a debug log. (Note that for debug logs to be
|
||||
# emitted you must set 'level' to logging.DEBUG above.)
|
||||
logging.debug("onInit called")
|
||||
|
||||
# For illustrative purposes, this plugin skeleton appends the received logs
|
||||
# to a file. When implementing your plugin, remove the following code.
|
||||
global outfile
|
||||
outfile = open("/tmp/logfile", "w")
|
||||
|
||||
|
||||
def onBeginTransaction():
|
||||
"""Begin the processing of a batch of messages.
|
||||
|
||||
This function is invoked only when the "useTransactions" parameter is
|
||||
configured to "on" in the 'omprog' action.
|
||||
|
||||
You can implement this function to e.g. start a database transaction.
|
||||
|
||||
Raises:
|
||||
RecoverableError: If a recoverable error occurs. The message or the
|
||||
transaction will be retried without restarting the plugin.
|
||||
Exception: If a non-recoverable error occurs. The plugin will be
|
||||
restarted before retrying the message or the transaction.
|
||||
"""
|
||||
logging.debug("onBeginTransaction called")
|
||||
|
||||
|
||||
def onMessage(msg):
|
||||
"""Process one log message received from rsyslog (e.g. send it to a
|
||||
database).
|
||||
|
||||
If this function raises an error and the "useTransactions" parameter is
|
||||
configured to "on" in the 'omprog' action, rsyslog will retry the full
|
||||
batch of messages. Otherwise, if "useTransactions" is set to "off", only
|
||||
this message will be retried.
|
||||
|
||||
Args:
|
||||
msg (str): the log message. Does NOT include a trailing newline.
|
||||
|
||||
Raises:
|
||||
RecoverableError: If a recoverable error occurs. The message or the
|
||||
transaction will be retried without restarting the plugin.
|
||||
Exception: If a non-recoverable error occurs. The plugin will be
|
||||
restarted before retrying the message or the transaction.
|
||||
"""
|
||||
logging.debug("onMessage called")
|
||||
|
||||
# It is recommended to check that the "useTransactions" flag is
|
||||
# appropriately configured in 'omprog'. If your plugin requires
|
||||
# transactions, you can check that they are enabled as follows:
|
||||
# global inTransaction
|
||||
# assert inTransaction, "This plugin requires transactions to be enabled"
|
||||
|
||||
# Otherwise, if your plugin does not support transactions, you can check
|
||||
# that they are disabled as follows:
|
||||
# global inTransaction
|
||||
# assert not inTransaction, "This plugin does not support transactions"
|
||||
|
||||
# For illustrative purposes, this plugin skeleton appends the received logs
|
||||
# to a file. When implementing your plugin, remove the following code.
|
||||
global outfile
|
||||
outfile.write(msg)
|
||||
outfile.write("\n")
|
||||
outfile.flush()
|
||||
|
||||
|
||||
def onCommitTransaction():
|
||||
"""Complete the processing of a batch of messages.
|
||||
|
||||
This function is invoked only when the "useTransactions" parameter is
|
||||
configured to "on" in the 'omprog' action.
|
||||
|
||||
You can implement this function to e.g. commit a database transaction.
|
||||
|
||||
Raises:
|
||||
RecoverableError: If a recoverable error occurs. The transaction
|
||||
will be retried without restarting the plugin.
|
||||
Exception: If a non-recoverable error occurs. The plugin will be
|
||||
restarted before retrying the transaction.
|
||||
"""
|
||||
logging.debug("onCommitTransaction called")
|
||||
|
||||
|
||||
def onRollbackTransaction():
|
||||
"""Cancel the processing of a batch of messages.
|
||||
|
||||
This function is invoked only when the "useTransactions" parameter is
|
||||
configured to "on" in the 'omprog' action, and when the "onMessage"
|
||||
function has raised a (recoverable or non-recoverable) error for one of
|
||||
the messages in the batch. It is also invoked if "onCommitTransaction"
|
||||
raises an error.
|
||||
|
||||
You can implement this function to e.g. rollback a database transaction.
|
||||
|
||||
This function should not raise any error. If it does, the error will be
|
||||
logged as a warning and ignored.
|
||||
"""
|
||||
logging.debug("onRollbackTransaction called")
|
||||
|
||||
|
||||
def onExit():
|
||||
"""Do everything that is needed to finish processing (e.g. close files,
|
||||
handles, disconnect from systems...). This is being called immediately
|
||||
before exiting.
|
||||
|
||||
This function should not raise any error. If it does, the error will be
|
||||
logged as a warning and ignored.
|
||||
"""
|
||||
logging.debug("onExit called")
|
||||
|
||||
# For illustrative purposes, this plugin skeleton appends the received logs
|
||||
# to a file. When implementing your plugin, remove the following code.
|
||||
global outfile
|
||||
outfile.close()
|
||||
|
||||
|
||||
"""
|
||||
-------------------------------------------------------
|
||||
This is plumbing that DOES NOT need to be CHANGED
|
||||
-------------------------------------------------------
|
||||
This is the main loop that receives messages from rsyslog via stdin,
|
||||
invokes the above entrypoints, and provides status codes to rsyslog
|
||||
via stdout. In most cases, modifying this code should not be necessary.
|
||||
|
||||
You will have to change the code below if you need the following
|
||||
advanced features:
|
||||
* Custom begin/end transaction marks: if you have configured the
|
||||
'omprog' action in rsyslog to use your own marks for transaction
|
||||
boundaries (instead of the default "BEGIN TRANSACTION" and "COMMIT
|
||||
TRANSACTION" messages), modify the code below accordingly.
|
||||
* Partial transaction commits: this skeleton confirms all messages
|
||||
inside a transaction using the "DEFER_COMMIT" status code. If you
|
||||
want to return the "PREVIOUS_COMMITED" or "OK" status codes within
|
||||
transactions, you will need to modify the code below. See
|
||||
http://www.rsyslog.com/doc/v8-stable/development/dev_oplugins.html
|
||||
for information about these status codes. Note that rsyslog will not
|
||||
send the "COMMIT TRANSACTION" mark if the last message in the
|
||||
transaction is confirmed with an "OK" status code.
|
||||
"""
|
||||
try:
|
||||
onInit()
|
||||
except Exception as e:
|
||||
# If an error occurs during initialization, log it and terminate. The
|
||||
# 'omprog' action will eventually restart the program.
|
||||
logging.exception("Initialization error, exiting program")
|
||||
sys.exit(1)
|
||||
|
||||
# Tell rsyslog we are ready to start processing messages:
|
||||
print("OK", flush=True)
|
||||
|
||||
inTransaction = False
|
||||
endedWithError = False
|
||||
try:
|
||||
line = sys.stdin.readline()
|
||||
while line:
|
||||
line = line.rstrip('\n')
|
||||
try:
|
||||
try:
|
||||
if line == "BEGIN TRANSACTION":
|
||||
onBeginTransaction()
|
||||
inTransaction = True
|
||||
status = "OK"
|
||||
elif line == "COMMIT TRANSACTION":
|
||||
onCommitTransaction()
|
||||
inTransaction = False
|
||||
status = "OK"
|
||||
else:
|
||||
onMessage(line)
|
||||
status = "DEFER_COMMIT" if inTransaction else "OK"
|
||||
|
||||
except Exception:
|
||||
# If a transaction was in progress, call onRollbackTransaction
|
||||
# to facilitate cleaning up the transaction state. Note that
|
||||
# rsyslog does not support this notification. (It probably
|
||||
# should, to allow the plugin to be notified in case the
|
||||
# transaction fails in the rsyslog side.)
|
||||
if inTransaction:
|
||||
try:
|
||||
onRollbackTransaction()
|
||||
except Exception as ignored:
|
||||
ignored.__suppress_context__ = True
|
||||
logging.warning("Exception ignored in onRollbackTransaction", exc_info=True)
|
||||
inTransaction = False
|
||||
raise
|
||||
|
||||
except RecoverableError as e:
|
||||
# Any line written to stdout that is not a status code will be
|
||||
# treated as a recoverable error by 'omprog', and cause the action
|
||||
# to be temporarily suspended. In this skeleton, we simply return
|
||||
# a one-line representation of the Python exception. (If debugging
|
||||
# is enabled in rsyslog, this line will appear in the debug logs.)
|
||||
status = repr(e)
|
||||
# We also log the complete exception to stderr (or to the logging
|
||||
# handler(s) configured in doInit, if any).
|
||||
logging.exception(e)
|
||||
inTransaction = False
|
||||
|
||||
# Send the status code (or the one-line error message) to rsyslog:
|
||||
print(status, flush=True)
|
||||
line = sys.stdin.readline()
|
||||
|
||||
except Exception:
|
||||
# If a non-recoverable error occurs, log it and terminate. The 'omprog'
|
||||
# action will eventually restart the program.
|
||||
logging.exception("Unrecoverable error, exiting program")
|
||||
endedWithError = True
|
||||
|
||||
try:
|
||||
onExit()
|
||||
except Exception:
|
||||
logging.warning("Exception ignored in onExit", exc_info=True)
|
||||
|
||||
if endedWithError:
|
||||
sys.exit(1)
|
||||
else:
|
||||
sys.exit(0)
|
||||
@ -13,11 +13,11 @@
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* -or-
|
||||
* see COPYING.ASL20 in the source distribution
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
@ -66,24 +66,31 @@ DEFobjCurrIf(errmsg)
|
||||
#define NO_HUP_FORWARD -1 /* indicates that HUP should NOT be forwarded */
|
||||
/* linux specific: how long to wait for process to terminate gracefully before issuing SIGKILL */
|
||||
#define DEFAULT_FORCED_TERMINATION_TIMEOUT_MS 5000
|
||||
#define READLINE_BUFFER_SIZE 1024
|
||||
|
||||
typedef struct _instanceData {
|
||||
uchar *szBinary; /* name of binary to call */
|
||||
char **aParams; /* Optional Parameters for binary command */
|
||||
char **aParams; /* optional parameters for binary command */
|
||||
uchar *tplName; /* assigned output template */
|
||||
int iParams; /* Holds the count of parameters if set*/
|
||||
int iParams; /* holds the count of parameters if set */
|
||||
int bConfirmMessages; /* does the program provide feedback via stdout? */
|
||||
int bUseTransactions; /* send begin/end transaction marks to program? */
|
||||
uchar *szBeginTransactionMark; /* mark message for begin transaction */
|
||||
uchar *szCommitTransactionMark; /* mark message for commit transaction */
|
||||
int bForceSingleInst; /* only a single wrkr instance of program permitted? */
|
||||
int iHUPForward; /* signal to forward on HUP (or NO_HUP_FORWARD) */
|
||||
uchar *outputFileName; /* name of file for std[out/err] or NULL if to discard */
|
||||
int bSignalOnClose; /* should signal process at shutdown */
|
||||
int bSignalOnClose; /* should signal process at shutdown */
|
||||
uchar *outputFileName; /* name of file to write the program output to, or NULL */
|
||||
pthread_mutex_t mut; /* make sure only one instance is active */
|
||||
} instanceData;
|
||||
|
||||
typedef struct wrkrInstanceData {
|
||||
instanceData *pData;
|
||||
pid_t pid; /* pid of currently running process */
|
||||
int fdOutput; /* it's fd (-1 if closed) */
|
||||
int fdPipeOut; /* file descriptor to write to */
|
||||
int fdPipeIn; /* fd we receive messages from the program (if we want to) */
|
||||
int fdPipeOut; /* fd for sending messages to the program */
|
||||
int fdPipeIn; /* fd for receiving status messages from the program */
|
||||
int fdPipeErr; /* fd for receiving error output from the program */
|
||||
int fdOutputFile; /* fd to write the program output to (-1 if to discard) */
|
||||
int bIsRunning; /* is binary currently running? 0-no, 1-yes */
|
||||
} wrkrInstanceData_t;
|
||||
|
||||
@ -92,139 +99,32 @@ typedef struct configSettings_s {
|
||||
} configSettings_t;
|
||||
static configSettings_t cs;
|
||||
|
||||
|
||||
/* tables for interfacing with the v6 config system */
|
||||
/* action (instance) parameters */
|
||||
static struct cnfparamdescr actpdescr[] = {
|
||||
{ "binary", eCmdHdlrString, CNFPARAM_REQUIRED },
|
||||
{ "confirmMessages", eCmdHdlrBinary, 0 },
|
||||
{ "useTransactions", eCmdHdlrBinary, 0 },
|
||||
{ "beginTransactionMark", eCmdHdlrString, 0 },
|
||||
{ "commitTransactionMark", eCmdHdlrString, 0 },
|
||||
{ "output", eCmdHdlrString, 0 },
|
||||
{ "forcesingleinstance", eCmdHdlrBinary, 0 },
|
||||
{ "hup.signal", eCmdHdlrGetWord, 0 },
|
||||
{ "template", eCmdHdlrGetWord, 0 },
|
||||
{ "signalOnClose", eCmdHdlrBinary, 0 }
|
||||
};
|
||||
|
||||
static struct cnfparamblk actpblk =
|
||||
{ CNFPARAMBLK_VERSION,
|
||||
sizeof(actpdescr)/sizeof(struct cnfparamdescr),
|
||||
actpdescr
|
||||
};
|
||||
|
||||
BEGINinitConfVars /* (re)set config variables to default values */
|
||||
CODESTARTinitConfVars
|
||||
cs.szBinary = NULL; /* name of binary to call */
|
||||
ENDinitConfVars
|
||||
|
||||
/* config settings */
|
||||
|
||||
BEGINcreateInstance
|
||||
CODESTARTcreateInstance
|
||||
pthread_mutex_init(&pData->mut, NULL);
|
||||
ENDcreateInstance
|
||||
|
||||
BEGINcreateWrkrInstance
|
||||
CODESTARTcreateWrkrInstance
|
||||
pWrkrData->fdPipeIn = -1;
|
||||
pWrkrData->fdPipeOut = -1;
|
||||
pWrkrData->fdOutput = -1;
|
||||
pWrkrData->bIsRunning = 0;
|
||||
ENDcreateWrkrInstance
|
||||
|
||||
|
||||
BEGINisCompatibleWithFeature
|
||||
CODESTARTisCompatibleWithFeature
|
||||
if(eFeat == sFEATURERepeatedMsgReduction)
|
||||
iRet = RS_RET_OK;
|
||||
ENDisCompatibleWithFeature
|
||||
|
||||
|
||||
BEGINfreeInstance
|
||||
int i;
|
||||
CODESTARTfreeInstance
|
||||
pthread_mutex_destroy(&pData->mut);
|
||||
free(pData->szBinary);
|
||||
free(pData->outputFileName);
|
||||
free(pData->tplName);
|
||||
if(pData->aParams != NULL) {
|
||||
for (i = 0; i < pData->iParams; i++) {
|
||||
free(pData->aParams[i]);
|
||||
}
|
||||
free(pData->aParams);
|
||||
}
|
||||
ENDfreeInstance
|
||||
|
||||
BEGINdbgPrintInstInfo
|
||||
CODESTARTdbgPrintInstInfo
|
||||
ENDdbgPrintInstInfo
|
||||
|
||||
|
||||
BEGINtryResume
|
||||
CODESTARTtryResume
|
||||
ENDtryResume
|
||||
|
||||
|
||||
/* As this is assume to be a debug function, we only make
|
||||
* best effort to write the message but do *not* try very
|
||||
* hard to handle errors. -- rgerhards, 2014-01-16
|
||||
*/
|
||||
static void
|
||||
writeProgramOutput(wrkrInstanceData_t *__restrict__ const pWrkrData,
|
||||
const char *__restrict__ const buf,
|
||||
const ssize_t lenBuf)
|
||||
{
|
||||
char errStr[1024];
|
||||
ssize_t r;
|
||||
|
||||
if(pWrkrData->fdOutput == -1) {
|
||||
pWrkrData->fdOutput = open((char*)pWrkrData->pData->outputFileName,
|
||||
O_WRONLY | O_APPEND | O_CREAT, 0600);
|
||||
if(pWrkrData->fdOutput == -1) {
|
||||
DBGPRINTF("omprog: error opening output file %s: %s\n",
|
||||
pWrkrData->pData->outputFileName,
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr)));
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
|
||||
r = write(pWrkrData->fdOutput, buf, (size_t) lenBuf);
|
||||
if(r != lenBuf) {
|
||||
DBGPRINTF("omprog: problem writing output file %s: bytes "
|
||||
"requested %lld, written %lld, msg: %s\n",
|
||||
pWrkrData->pData->outputFileName, (long long) lenBuf, (long long) r,
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr)));
|
||||
}
|
||||
done: return;
|
||||
}
|
||||
|
||||
|
||||
/* check output of the executed program
|
||||
* If configured to care about the output, we check if there is some and,
|
||||
* if so, properly handle it.
|
||||
*/
|
||||
static void
|
||||
checkProgramOutput(wrkrInstanceData_t *__restrict__ const pWrkrData)
|
||||
{
|
||||
char buf[4096];
|
||||
ssize_t r;
|
||||
|
||||
if(pWrkrData->fdPipeIn == -1)
|
||||
goto done;
|
||||
|
||||
do {
|
||||
r = read(pWrkrData->fdPipeIn, buf, sizeof(buf));
|
||||
if(r > 0)
|
||||
writeProgramOutput(pWrkrData, buf, r);
|
||||
} while(r > 0);
|
||||
|
||||
done: return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* execute the child process (must be called in child context
|
||||
* after fork).
|
||||
*/
|
||||
static __attribute__((noreturn)) void
|
||||
execBinary(wrkrInstanceData_t *pWrkrData, int fdStdin, int fdStdOutErr)
|
||||
execBinary(wrkrInstanceData_t *pWrkrData, int fdStdin, int fdStdout, int fdStderr)
|
||||
{
|
||||
int i, iRet;
|
||||
struct sigaction sigAct;
|
||||
@ -233,24 +133,39 @@ execBinary(wrkrInstanceData_t *pWrkrData, int fdStdin, int fdStdOutErr)
|
||||
char *newenviron[] = { NULL };
|
||||
char *emptyArgv[] = { NULL };
|
||||
|
||||
fclose(stdin);
|
||||
if(dup(fdStdin) == -1) {
|
||||
if(dup2(fdStdin, STDIN_FILENO) == -1) {
|
||||
DBGPRINTF("omprog: dup() stdin failed\n");
|
||||
/* do some more error handling here? Maybe if the module
|
||||
* gets some more widespread use...
|
||||
*/
|
||||
}
|
||||
if(pWrkrData->pData->outputFileName == NULL) {
|
||||
close(fdStdOutErr);
|
||||
} else {
|
||||
close(1);
|
||||
if(dup(fdStdOutErr) == -1) {
|
||||
|
||||
if(pWrkrData->pData->bConfirmMessages) {
|
||||
/* send message confirmations via stdout */
|
||||
if(dup2(fdStdout, STDOUT_FILENO) == -1) {
|
||||
DBGPRINTF("omprog: dup() stdout failed\n");
|
||||
}
|
||||
close(2);
|
||||
if(dup(fdStdOutErr) == -1) {
|
||||
/* redirect stderr to the output file, if specified */
|
||||
if (pWrkrData->pData->outputFileName != NULL) {
|
||||
if(dup2(fdStderr, STDERR_FILENO) == -1) {
|
||||
DBGPRINTF("omprog: dup() stderr failed\n");
|
||||
}
|
||||
} else {
|
||||
close(fdStderr);
|
||||
}
|
||||
} else if (pWrkrData->pData->outputFileName != NULL) {
|
||||
/* redirect both stdout and stderr to the output file */
|
||||
if(dup2(fdStderr, STDOUT_FILENO) == -1) {
|
||||
DBGPRINTF("omprog: dup() stdout failed\n");
|
||||
}
|
||||
if(dup2(fdStderr, STDERR_FILENO) == -1) {
|
||||
DBGPRINTF("omprog: dup() stderr failed\n");
|
||||
}
|
||||
close(fdStdout);
|
||||
} else {
|
||||
/* no need to send data to parent via stdout or stderr */
|
||||
close(fdStdout);
|
||||
close(fdStderr);
|
||||
}
|
||||
|
||||
/* we close all file handles as we fork soon
|
||||
@ -294,28 +209,31 @@ execBinary(wrkrInstanceData_t *pWrkrData, int fdStdin, int fdStdOutErr)
|
||||
syslog(LOG_ERR, "omprog: failed to execute binary '%s': %s\n",
|
||||
pWrkrData->pData->szBinary, errStr);
|
||||
}
|
||||
|
||||
|
||||
/* we should never reach this point, but if we do, we terminate */
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
||||
/* creates a pipe and starts program, uses pipe as stdin for program.
|
||||
* rgerhards, 2009-04-01
|
||||
*/
|
||||
static rsRetVal
|
||||
openPipe(wrkrInstanceData_t *pWrkrData)
|
||||
{
|
||||
int pipestdin[2];
|
||||
int pipestdout[2];
|
||||
int pipeStdin[2];
|
||||
int pipeStdout[2];
|
||||
int pipeStderr[2];
|
||||
pid_t cpid;
|
||||
int flags;
|
||||
DEFiRet;
|
||||
|
||||
if(pipe(pipestdin) == -1) {
|
||||
if(pipe(pipeStdin) == -1) {
|
||||
ABORT_FINALIZE(RS_RET_ERR_CREAT_PIPE);
|
||||
}
|
||||
if(pipe(pipestdout) == -1) {
|
||||
if(pipe(pipeStdout) == -1) {
|
||||
ABORT_FINALIZE(RS_RET_ERR_CREAT_PIPE);
|
||||
}
|
||||
if(pipe(pipeStderr) == -1) {
|
||||
ABORT_FINALIZE(RS_RET_ERR_CREAT_PIPE);
|
||||
}
|
||||
|
||||
@ -330,34 +248,115 @@ openPipe(wrkrInstanceData_t *pWrkrData)
|
||||
}
|
||||
pWrkrData->pid = cpid;
|
||||
|
||||
if(cpid == 0) {
|
||||
if(cpid == 0) {
|
||||
/* we are now the child, just exec the binary. */
|
||||
close(pipestdin[1]); /* close those pipe "ports" that */
|
||||
close(pipestdout[0]); /* we don't need */
|
||||
execBinary(pWrkrData, pipestdin[0], pipestdout[1]);
|
||||
close(pipeStdin[1]); /* close those pipe "ports" that */
|
||||
close(pipeStdout[0]); /* ... we don't need */
|
||||
close(pipeStderr[0]);
|
||||
execBinary(pWrkrData, pipeStdin[0], pipeStdout[1], pipeStderr[1]);
|
||||
/*NO CODE HERE - WILL NEVER BE REACHED!*/
|
||||
}
|
||||
|
||||
DBGPRINTF("omprog: child has pid %d\n", (int) cpid);
|
||||
if(pWrkrData->pData->outputFileName != NULL) {
|
||||
pWrkrData->fdPipeIn = pipestdout[0];
|
||||
/* we need to set our fd to be non-blocking! */
|
||||
flags = fcntl(pWrkrData->fdPipeIn, F_GETFL);
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(pWrkrData->fdPipeIn, F_SETFL, flags);
|
||||
} else {
|
||||
|
||||
close(pipeStdin[0]);
|
||||
close(pipeStdout[1]);
|
||||
close(pipeStderr[1]);
|
||||
|
||||
/* we'll send messages to the program via fdPipeOut */
|
||||
pWrkrData->fdPipeOut = pipeStdin[1];
|
||||
|
||||
if(pWrkrData->pData->bConfirmMessages) {
|
||||
/* we'll receive message confirmations via fdPipeIn */
|
||||
pWrkrData->fdPipeIn = pipeStdout[0];
|
||||
/* we'll capture stderr to the output file, if specified */
|
||||
if (pWrkrData->pData->outputFileName != NULL) {
|
||||
pWrkrData->fdPipeErr = pipeStderr[0];
|
||||
}
|
||||
else {
|
||||
pWrkrData->fdPipeErr = -1;
|
||||
}
|
||||
} else if (pWrkrData->pData->outputFileName != NULL) {
|
||||
/* we'll capture both stdout and stderr via fdPipeErr */
|
||||
close(pipeStdout[0]);
|
||||
pWrkrData->fdPipeIn = -1;
|
||||
close(pipestdout[0]);
|
||||
pWrkrData->fdPipeErr = pipeStderr[0];
|
||||
} else {
|
||||
/* no need to read the program stdout or stderr */
|
||||
close(pipeStdout[0]);
|
||||
close(pipeStderr[0]);
|
||||
pWrkrData->fdPipeIn = -1;
|
||||
pWrkrData->fdPipeErr = -1;
|
||||
}
|
||||
close(pipestdin[0]);
|
||||
close(pipestdout[1]);
|
||||
pWrkrData->pid = cpid;
|
||||
pWrkrData->fdPipeOut = pipestdin[1];
|
||||
|
||||
if(pWrkrData->fdPipeErr != -1) {
|
||||
/* set our fd to be non-blocking */
|
||||
flags = fcntl(pWrkrData->fdPipeErr, F_GETFL);
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(pWrkrData->fdPipeErr, F_SETFL, flags);
|
||||
}
|
||||
|
||||
pWrkrData->bIsRunning = 1;
|
||||
finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
/* As this is assume to be a debug function, we only make
|
||||
* best effort to write the message but do *not* try very
|
||||
* hard to handle errors. -- rgerhards, 2014-01-16
|
||||
*/
|
||||
static void
|
||||
writeProgramOutput(wrkrInstanceData_t *__restrict__ const pWrkrData,
|
||||
const char *__restrict__ const buf,
|
||||
const ssize_t lenBuf)
|
||||
{
|
||||
char errStr[1024];
|
||||
ssize_t r;
|
||||
|
||||
if(pWrkrData->fdOutputFile == -1) {
|
||||
pWrkrData->fdOutputFile = open((char*)pWrkrData->pData->outputFileName,
|
||||
O_WRONLY | O_APPEND | O_CREAT, 0600);
|
||||
if(pWrkrData->fdOutputFile == -1) {
|
||||
DBGPRINTF("omprog: error opening output file %s: %s\n",
|
||||
pWrkrData->pData->outputFileName,
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr)));
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
|
||||
r = write(pWrkrData->fdOutputFile, buf, (size_t) lenBuf);
|
||||
if(r != lenBuf) {
|
||||
DBGPRINTF("omprog: problem writing output file %s: bytes "
|
||||
"requested %lld, written %lld, msg: %s\n",
|
||||
pWrkrData->pData->outputFileName, (long long) lenBuf, (long long) r,
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr)));
|
||||
}
|
||||
done: return;
|
||||
}
|
||||
|
||||
/* check output of the executed program
|
||||
* If configured to care about the output, we check if there is some and,
|
||||
* if so, properly handle it.
|
||||
*/
|
||||
static void
|
||||
checkProgramOutput(wrkrInstanceData_t *__restrict__ const pWrkrData)
|
||||
{
|
||||
char buf[4096];
|
||||
ssize_t r;
|
||||
|
||||
if(pWrkrData->fdPipeErr == -1)
|
||||
goto done;
|
||||
|
||||
do {
|
||||
r = read(pWrkrData->fdPipeErr, buf, sizeof(buf));
|
||||
if(r > 0) {
|
||||
writeProgramOutput(pWrkrData, buf, r);
|
||||
}
|
||||
} while(r > 0);
|
||||
|
||||
done: return;
|
||||
}
|
||||
|
||||
#if defined(__linux__) && defined(_GNU_SOURCE)
|
||||
typedef struct subprocess_timeout_desc_s {
|
||||
pthread_attr_t thd_attr;
|
||||
@ -370,7 +369,8 @@ typedef struct subprocess_timeout_desc_s {
|
||||
struct timespec timeout;
|
||||
} subprocess_timeout_desc_t;
|
||||
|
||||
static void * killSubprocessOnTimeout(void *_subpTimeOut_p) {
|
||||
static void *
|
||||
killSubprocessOnTimeout(void *_subpTimeOut_p) {
|
||||
subprocess_timeout_desc_t *subpTimeOut = (subprocess_timeout_desc_t *) _subpTimeOut_p;
|
||||
if (pthread_mutex_lock(&subpTimeOut->lock) == 0) {
|
||||
while (subpTimeOut->timeout_armed) {
|
||||
@ -417,11 +417,11 @@ finalize_it:
|
||||
static void
|
||||
doForceKillSubprocess(subprocess_timeout_desc_t *subpTimeOut, int do_kill, pid_t pid)
|
||||
{
|
||||
if (pthread_mutex_lock(&subpTimeOut->lock) == 0) {
|
||||
subpTimeOut->timeout_armed = 0;
|
||||
pthread_cond_signal(&subpTimeOut->cond);
|
||||
pthread_mutex_unlock(&subpTimeOut->lock);
|
||||
}
|
||||
if (pthread_mutex_lock(&subpTimeOut->lock) == 0) {
|
||||
subpTimeOut->timeout_armed = 0;
|
||||
pthread_cond_signal(&subpTimeOut->cond);
|
||||
pthread_mutex_unlock(&subpTimeOut->lock);
|
||||
}
|
||||
pthread_join(subpTimeOut->thd, NULL);
|
||||
if (do_kill) {
|
||||
if (kill(pid, 9) == 0) {
|
||||
@ -447,7 +447,8 @@ waitForChild(wrkrInstanceData_t *pWrkrData, long timeout_ms)
|
||||
int timeoutSetupStatus;
|
||||
int waitpid_interrupted;
|
||||
|
||||
if (timeout_ms > 0) timeoutSetupStatus = setupSubprocessTimeout(&subpTimeOut, timeout_ms);
|
||||
if (timeout_ms > 0)
|
||||
timeoutSetupStatus = setupSubprocessTimeout(&subpTimeOut, timeout_ms);
|
||||
#endif
|
||||
|
||||
ret = waitpid(pWrkrData->pid, &status, 0);
|
||||
@ -483,24 +484,26 @@ waitForChild(wrkrInstanceData_t *pWrkrData, long timeout_ms)
|
||||
}
|
||||
}
|
||||
|
||||
/* clean up after a terminated child
|
||||
/* clean up the state after a terminated child
|
||||
*/
|
||||
static rsRetVal
|
||||
cleanup(wrkrInstanceData_t *pWrkrData, long timeout_ms)
|
||||
static void
|
||||
cleanupChild(wrkrInstanceData_t *pWrkrData, long timeout_ms)
|
||||
{
|
||||
DEFiRet;
|
||||
|
||||
assert(pWrkrData->bIsRunning == 1);
|
||||
|
||||
if (pWrkrData->pData->bSignalOnClose) {
|
||||
if(pWrkrData->pData->bSignalOnClose) {
|
||||
waitForChild(pWrkrData, timeout_ms);
|
||||
}
|
||||
|
||||
checkProgramOutput(pWrkrData); /* try to catch any late messages */
|
||||
checkProgramOutput(pWrkrData); /* try to catch any late messages */
|
||||
|
||||
if(pWrkrData->fdOutput != -1) {
|
||||
close(pWrkrData->fdOutput);
|
||||
pWrkrData->fdOutput = -1;
|
||||
if(pWrkrData->fdOutputFile != -1) {
|
||||
close(pWrkrData->fdOutputFile);
|
||||
pWrkrData->fdOutputFile = -1;
|
||||
}
|
||||
if(pWrkrData->fdPipeErr != -1) {
|
||||
close(pWrkrData->fdPipeErr);
|
||||
pWrkrData->fdPipeErr = -1;
|
||||
}
|
||||
if(pWrkrData->fdPipeIn != -1) {
|
||||
close(pWrkrData->fdPipeIn);
|
||||
@ -511,31 +514,20 @@ cleanup(wrkrInstanceData_t *pWrkrData, long timeout_ms)
|
||||
pWrkrData->fdPipeOut = -1;
|
||||
}
|
||||
pWrkrData->bIsRunning = 0;
|
||||
pWrkrData->bIsRunning = 0;
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
BEGINfreeWrkrInstance
|
||||
CODESTARTfreeWrkrInstance
|
||||
if (pWrkrData->bIsRunning) {
|
||||
if (pWrkrData->pData->bSignalOnClose) {
|
||||
kill(pWrkrData->pid, SIGTERM);
|
||||
}
|
||||
CHKiRet(cleanup(pWrkrData, DEFAULT_FORCED_TERMINATION_TIMEOUT_MS));
|
||||
}
|
||||
finalize_it:
|
||||
ENDfreeWrkrInstance
|
||||
|
||||
/* try to restart the binary when it has stopped.
|
||||
/* Send SIGTERM to child process (if configured to do so), wait for
|
||||
* termination, and clean up the state.
|
||||
*/
|
||||
static rsRetVal
|
||||
tryRestart(wrkrInstanceData_t *pWrkrData)
|
||||
static void
|
||||
terminateChild(wrkrInstanceData_t *pWrkrData)
|
||||
{
|
||||
DEFiRet;
|
||||
assert(pWrkrData->bIsRunning == 0);
|
||||
assert(pWrkrData->bIsRunning == 1);
|
||||
|
||||
iRet = openPipe(pWrkrData);
|
||||
RETiRet;
|
||||
if (pWrkrData->pData->bSignalOnClose) {
|
||||
kill(pWrkrData->pid, SIGTERM);
|
||||
}
|
||||
cleanupChild(pWrkrData, DEFAULT_FORCED_TERMINATION_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
/* write to pipe
|
||||
@ -551,7 +543,7 @@ writePipe(wrkrInstanceData_t *pWrkrData, uchar *szMsg)
|
||||
int writeOffset;
|
||||
char errStr[1024];
|
||||
DEFiRet;
|
||||
|
||||
|
||||
lenWrite = strlen((char*)szMsg);
|
||||
writeOffset = 0;
|
||||
|
||||
@ -559,22 +551,18 @@ writePipe(wrkrInstanceData_t *pWrkrData, uchar *szMsg)
|
||||
checkProgramOutput(pWrkrData);
|
||||
lenWritten = write(pWrkrData->fdPipeOut, ((char*)szMsg)+writeOffset, lenWrite);
|
||||
if(lenWritten == -1) {
|
||||
switch(errno) {
|
||||
case EPIPE:
|
||||
DBGPRINTF("omprog: program '%s' terminated, trying to restart\n",
|
||||
if(errno == EPIPE) {
|
||||
DBGPRINTF("omprog: program '%s' terminated, will be restarted\n",
|
||||
pWrkrData->pData->szBinary);
|
||||
CHKiRet(cleanup(pWrkrData, 0));
|
||||
CHKiRet(tryRestart(pWrkrData));
|
||||
break;
|
||||
default:
|
||||
/* force restart in tryResume() */
|
||||
cleanupChild(pWrkrData, 0);
|
||||
} else {
|
||||
DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno,
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr)));
|
||||
ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
writeOffset += lenWritten;
|
||||
ABORT_FINALIZE(RS_RET_SUSPENDED);
|
||||
}
|
||||
writeOffset += lenWritten;
|
||||
} while(lenWritten != lenWrite);
|
||||
|
||||
checkProgramOutput(pWrkrData);
|
||||
@ -583,36 +571,285 @@ finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
/* Reads a line from a blocking pipe, using the unistd.h read() function.
|
||||
* Returns the line as a null-terminated string in *lineptr, not including
|
||||
* the \n or \r\n terminator.
|
||||
* On success, returns the line length.
|
||||
* On error, returns -1 and sets errno.
|
||||
* On EOF, returns -1 and sets errno to EPIPE.
|
||||
* On success, the caller is responsible for freeing the returned line buffer.
|
||||
*/
|
||||
static ssize_t
|
||||
readline(int fd, char **lineptr)
|
||||
{
|
||||
char *buf = NULL;
|
||||
char *new_buf;
|
||||
size_t buf_size = 0;
|
||||
size_t len = 0;
|
||||
ssize_t nr;
|
||||
char ch;
|
||||
|
||||
nr = read(fd, &ch, 1);
|
||||
while (nr == 1 && ch != '\n') {
|
||||
if (len == buf_size) {
|
||||
buf_size += READLINE_BUFFER_SIZE;
|
||||
new_buf = (char*) realloc(buf, buf_size);
|
||||
if (new_buf == NULL) {
|
||||
free(buf);
|
||||
errno = ENOMEM;
|
||||
return -1;
|
||||
}
|
||||
buf = new_buf;
|
||||
}
|
||||
|
||||
buf[len++] = ch;
|
||||
nr = read(fd, &ch, 1);
|
||||
}
|
||||
|
||||
if (nr == 0) { /* EOF */
|
||||
free(buf);
|
||||
errno = EPIPE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (nr == -1) {
|
||||
free(buf);
|
||||
return -1; /* errno already set by 'read' */
|
||||
}
|
||||
|
||||
/* Ignore \r (if any) before \n */
|
||||
if (len > 0 && buf[len-1] == '\r') {
|
||||
--len;
|
||||
}
|
||||
|
||||
/* If necessary, make room for the null terminator */
|
||||
if (len == buf_size) {
|
||||
new_buf = (char*) realloc(buf, ++buf_size);
|
||||
if (new_buf == NULL) {
|
||||
free(buf);
|
||||
errno = ENOMEM;
|
||||
return -1;
|
||||
}
|
||||
buf = new_buf;
|
||||
}
|
||||
|
||||
buf[len] = '\0';
|
||||
*lineptr = buf;
|
||||
return len;
|
||||
}
|
||||
|
||||
static rsRetVal
|
||||
lineToStatusCode(wrkrInstanceData_t *pWrkrData, const char* line)
|
||||
{
|
||||
DEFiRet;
|
||||
|
||||
if(strcmp(line, "OK") == 0) {
|
||||
iRet = RS_RET_OK;
|
||||
} else if(strcmp(line, "DEFER_COMMIT") == 0) {
|
||||
iRet = RS_RET_DEFER_COMMIT;
|
||||
} else if(strcmp(line, "PREVIOUS_COMMITTED") == 0) {
|
||||
iRet = RS_RET_PREVIOUS_COMMITTED;
|
||||
} else {
|
||||
/* anything else is considered a recoverable error */
|
||||
DBGPRINTF("omprog: program '%s' returned: %s\n",
|
||||
pWrkrData->pData->szBinary, line);
|
||||
iRet = RS_RET_SUSPENDED;
|
||||
}
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
static rsRetVal
|
||||
readPipe(wrkrInstanceData_t *pWrkrData)
|
||||
{
|
||||
char *line;
|
||||
ssize_t lineLen;
|
||||
char errStr[1024];
|
||||
DEFiRet;
|
||||
|
||||
lineLen = readline(pWrkrData->fdPipeIn, &line);
|
||||
if (lineLen == -1) {
|
||||
if (errno == EPIPE) {
|
||||
DBGPRINTF("omprog: program '%s' terminated, will be restarted\n",
|
||||
pWrkrData->pData->szBinary);
|
||||
/* force restart in tryResume() */
|
||||
cleanupChild(pWrkrData, 0);
|
||||
} else {
|
||||
DBGPRINTF("omprog: error %d reading from pipe: %s\n", errno,
|
||||
rs_strerror_r(errno, errStr, sizeof(errStr)));
|
||||
}
|
||||
ABORT_FINALIZE(RS_RET_SUSPENDED);
|
||||
}
|
||||
|
||||
iRet = lineToStatusCode(pWrkrData, line);
|
||||
free(line);
|
||||
finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
static rsRetVal
|
||||
startChild(wrkrInstanceData_t *pWrkrData)
|
||||
{
|
||||
DEFiRet;
|
||||
|
||||
assert(pWrkrData->bIsRunning == 0);
|
||||
|
||||
CHKiRet(openPipe(pWrkrData));
|
||||
|
||||
if(pWrkrData->pData->bConfirmMessages) {
|
||||
/* wait for program to confirm successful initialization */
|
||||
CHKiRet(readPipe(pWrkrData));
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
if (iRet != RS_RET_OK && pWrkrData->bIsRunning) {
|
||||
/* if initialization has failed, terminate program */
|
||||
terminateChild(pWrkrData);
|
||||
}
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
BEGINinitConfVars /* (re)set config variables to default values */
|
||||
CODESTARTinitConfVars
|
||||
cs.szBinary = NULL; /* name of binary to call */
|
||||
ENDinitConfVars
|
||||
|
||||
|
||||
BEGINcreateInstance
|
||||
CODESTARTcreateInstance
|
||||
pthread_mutex_init(&pData->mut, NULL);
|
||||
ENDcreateInstance
|
||||
|
||||
|
||||
BEGINcreateWrkrInstance
|
||||
CODESTARTcreateWrkrInstance
|
||||
pWrkrData->fdPipeOut = -1;
|
||||
pWrkrData->fdPipeIn = -1;
|
||||
pWrkrData->fdPipeErr = -1;
|
||||
pWrkrData->fdOutputFile = -1;
|
||||
pWrkrData->bIsRunning = 0;
|
||||
|
||||
CHKiRet(startChild(pWrkrData));
|
||||
finalize_it:
|
||||
ENDcreateWrkrInstance
|
||||
|
||||
|
||||
BEGINisCompatibleWithFeature
|
||||
CODESTARTisCompatibleWithFeature
|
||||
if(eFeat == sFEATURERepeatedMsgReduction)
|
||||
iRet = RS_RET_OK;
|
||||
ENDisCompatibleWithFeature
|
||||
|
||||
|
||||
BEGINdbgPrintInstInfo
|
||||
CODESTARTdbgPrintInstInfo
|
||||
ENDdbgPrintInstInfo
|
||||
|
||||
|
||||
BEGINtryResume
|
||||
CODESTARTtryResume
|
||||
if (pWrkrData->bIsRunning == 0) {
|
||||
CHKiRet(startChild(pWrkrData));
|
||||
}
|
||||
finalize_it:
|
||||
ENDtryResume
|
||||
|
||||
|
||||
BEGINbeginTransaction
|
||||
CODESTARTbeginTransaction
|
||||
if(!pWrkrData->pData->bUseTransactions) {
|
||||
FINALIZE;
|
||||
}
|
||||
|
||||
CHKiRet(writePipe(pWrkrData, pWrkrData->pData->szBeginTransactionMark));
|
||||
CHKiRet(writePipe(pWrkrData, (uchar*) "\n"));
|
||||
|
||||
if(pWrkrData->pData->bConfirmMessages) {
|
||||
CHKiRet(readPipe(pWrkrData));
|
||||
}
|
||||
finalize_it:
|
||||
ENDbeginTransaction
|
||||
|
||||
|
||||
BEGINdoAction
|
||||
instanceData *pData;
|
||||
CODESTARTdoAction
|
||||
pData = pWrkrData->pData;
|
||||
if(pData->bForceSingleInst)
|
||||
pthread_mutex_lock(&pData->mut);
|
||||
if(pWrkrData->bIsRunning == 0) {
|
||||
openPipe(pWrkrData);
|
||||
if(pWrkrData->pData->bForceSingleInst) {
|
||||
pthread_mutex_lock(&pWrkrData->pData->mut);
|
||||
}
|
||||
if(pWrkrData->bIsRunning == 0) { /* should not occur */
|
||||
ABORT_FINALIZE(RS_RET_SUSPENDED);
|
||||
}
|
||||
|
||||
iRet = writePipe(pWrkrData, ppString[0]);
|
||||
|
||||
if(iRet != RS_RET_OK)
|
||||
iRet = RS_RET_SUSPENDED;
|
||||
if(pData->bForceSingleInst)
|
||||
pthread_mutex_unlock(&pData->mut);
|
||||
CHKiRet(writePipe(pWrkrData, ppString[0]));
|
||||
|
||||
if(pWrkrData->pData->bConfirmMessages) {
|
||||
CHKiRet(readPipe(pWrkrData));
|
||||
} else if(pWrkrData->pData->bUseTransactions) {
|
||||
/* ensure endTransaction will be called */
|
||||
iRet = RS_RET_DEFER_COMMIT;
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
if(pWrkrData->pData->bForceSingleInst) {
|
||||
pthread_mutex_unlock(&pWrkrData->pData->mut);
|
||||
}
|
||||
ENDdoAction
|
||||
|
||||
|
||||
BEGINendTransaction
|
||||
CODESTARTendTransaction
|
||||
if(!pWrkrData->pData->bUseTransactions) {
|
||||
FINALIZE;
|
||||
}
|
||||
|
||||
CHKiRet(writePipe(pWrkrData, pWrkrData->pData->szCommitTransactionMark));
|
||||
CHKiRet(writePipe(pWrkrData, (uchar*) "\n"));
|
||||
|
||||
if(pWrkrData->pData->bConfirmMessages) {
|
||||
CHKiRet(readPipe(pWrkrData));
|
||||
}
|
||||
finalize_it:
|
||||
ENDendTransaction
|
||||
|
||||
|
||||
BEGINfreeWrkrInstance
|
||||
CODESTARTfreeWrkrInstance
|
||||
if (pWrkrData->bIsRunning) {
|
||||
terminateChild(pWrkrData);
|
||||
}
|
||||
ENDfreeWrkrInstance
|
||||
|
||||
|
||||
BEGINfreeInstance
|
||||
int i;
|
||||
CODESTARTfreeInstance
|
||||
pthread_mutex_destroy(&pData->mut);
|
||||
free(pData->szBinary);
|
||||
free(pData->outputFileName);
|
||||
free(pData->tplName);
|
||||
if(pData->aParams != NULL) {
|
||||
for (i = 0; i < pData->iParams; i++) {
|
||||
free(pData->aParams[i]);
|
||||
}
|
||||
free(pData->aParams);
|
||||
}
|
||||
ENDfreeInstance
|
||||
|
||||
|
||||
static void
|
||||
setInstParamDefaults(instanceData *pData)
|
||||
{
|
||||
pData->szBinary = NULL;
|
||||
pData->aParams = NULL;
|
||||
pData->outputFileName = NULL;
|
||||
pData->iParams = 0;
|
||||
pData->bConfirmMessages = 0;
|
||||
pData->bUseTransactions = 0;
|
||||
pData->szBeginTransactionMark = (uchar*) "BEGIN TRANSACTION";
|
||||
pData->szCommitTransactionMark = (uchar*) "COMMIT TRANSACTION";
|
||||
pData->bForceSingleInst = 0;
|
||||
pData->bSignalOnClose = 0;
|
||||
pData->iHUPForward = NO_HUP_FORWARD;
|
||||
pData->bSignalOnClose = 0;
|
||||
pData->outputFileName = NULL;
|
||||
}
|
||||
|
||||
BEGINnewActInst
|
||||
@ -653,7 +890,7 @@ CODESTARTnewActInst
|
||||
break;
|
||||
}
|
||||
iCnt++;
|
||||
}
|
||||
}
|
||||
/* Assign binary and params */
|
||||
pData->szBinary = (uchar*)es_str2cstr(estrBinary, NULL);
|
||||
DBGPRINTF("omprog: szBinary = '%s'\n", pData->szBinary);
|
||||
@ -664,7 +901,7 @@ CODESTARTnewActInst
|
||||
dbgprintf("omprog: szParams = '%s'\n", params);
|
||||
free(params);
|
||||
}
|
||||
|
||||
|
||||
/* Count parameters if set */
|
||||
c = es_getBufAddr(estrParams); /* Reset to beginning */
|
||||
pData->iParams = 2; /* Set default to 2, first parameter for binary and second parameter at least from config*/
|
||||
@ -677,7 +914,7 @@ CODESTARTnewActInst
|
||||
DBGPRINTF("omprog: iParams = '%d'\n", pData->iParams);
|
||||
|
||||
/* Create argv Array */
|
||||
CHKmalloc(pData->aParams = malloc( (pData->iParams+1) * sizeof(char*))); /* One more for first param */
|
||||
CHKmalloc(pData->aParams = malloc( (pData->iParams+1) * sizeof(char*))); /* One more for first param */
|
||||
|
||||
/* Second Loop, create parameter array*/
|
||||
c = es_getBufAddr(estrParams); /* Reset to beginning */
|
||||
@ -716,8 +953,15 @@ CODESTARTnewActInst
|
||||
}
|
||||
/* NULL last parameter! */
|
||||
pData->aParams[iPrm] = NULL;
|
||||
|
||||
}
|
||||
} else if(!strcmp(actpblk.descr[i].name, "confirmMessages")) {
|
||||
pData->bConfirmMessages = (int) pvals[i].val.d.n;
|
||||
} else if(!strcmp(actpblk.descr[i].name, "useTransactions")) {
|
||||
pData->bUseTransactions = (int) pvals[i].val.d.n;
|
||||
} else if(!strcmp(actpblk.descr[i].name, "beginTransactionMark")) {
|
||||
pData->szBeginTransactionMark = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
|
||||
} else if(!strcmp(actpblk.descr[i].name, "commitTransactionMark")) {
|
||||
pData->szCommitTransactionMark = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
|
||||
} else if(!strcmp(actpblk.descr[i].name, "output")) {
|
||||
pData->outputFileName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
|
||||
} else if(!strcmp(actpblk.descr[i].name, "forcesingleinstance")) {
|
||||
@ -749,7 +993,7 @@ CODESTARTnewActInst
|
||||
}
|
||||
}
|
||||
|
||||
CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
|
||||
CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
|
||||
"RSYSLOG_FileFormat" : (char*)pData->tplName),
|
||||
OMSR_NO_RQD_TPL_OPTS));
|
||||
DBGPRINTF("omprog: bForceSingleInst %d\n", pData->bForceSingleInst);
|
||||
@ -757,6 +1001,7 @@ CODE_STD_FINALIZERnewActInst
|
||||
cnfparamvalsDestruct(pvals, &actpblk);
|
||||
ENDnewActInst
|
||||
|
||||
|
||||
BEGINparseSelectorAct
|
||||
CODESTARTparseSelectorAct
|
||||
CODE_STD_STRING_REQUESTparseSelectorAct(1)
|
||||
@ -812,13 +1057,12 @@ BEGINqueryEtryPt
|
||||
CODESTARTqueryEtryPt
|
||||
CODEqueryEtryPt_STD_OMOD_QUERIES
|
||||
CODEqueryEtryPt_STD_OMOD8_QUERIES
|
||||
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
|
||||
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
|
||||
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface */
|
||||
CODEqueryEtryPt_doHUPWrkr
|
||||
ENDqueryEtryPt
|
||||
|
||||
|
||||
|
||||
/* Reset config variables for this module to default values.
|
||||
*/
|
||||
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
|
||||
@ -829,19 +1073,24 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
BEGINmodInit()
|
||||
CODESTARTmodInit
|
||||
INITLegCnfVars
|
||||
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
|
||||
CODEmodInit_QueryRegCFSLineHdlr
|
||||
/* tell engine which objects we need */
|
||||
CHKiRet(objUse(errmsg, CORE_COMPONENT));
|
||||
|
||||
/* check that rsyslog core supports transactional plugins */
|
||||
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
|
||||
if (!bCoreSupportsBatching) {
|
||||
errmsg.LogError(0, NO_ERRCODE, "omprog: rsyslog core too old (does not support batching)");
|
||||
ABORT_FINALIZE(RS_RET_ERR);
|
||||
}
|
||||
|
||||
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionomprogbinary", 0, eCmdHdlrGetWord, NULL, &cs.szBinary,
|
||||
STD_LOADABLE_MODULE_ID));
|
||||
STD_LOADABLE_MODULE_ID));
|
||||
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables,
|
||||
NULL, STD_LOADABLE_MODULE_ID));
|
||||
NULL, STD_LOADABLE_MODULE_ID));
|
||||
CODEmodInit_QueryRegCFSLineHdlr
|
||||
ENDmodInit
|
||||
|
||||
/* vi:set ai:
|
||||
*/
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user