Merge pull request #1325 from taotetek/master

new omczmq features
This commit is contained in:
taotetek 2017-01-04 13:28:24 -05:00 committed by GitHub
commit cb50f28fa6
4 changed files with 207 additions and 60 deletions

View File

@ -1,5 +1,11 @@
ZeroMQ 3.x Input Plugin
DEPRECATION NOTICE
------------------
This plugin is not maintained and is deprecated. For ZeroMQ output support,
please use contrib/omczmq, which is actively developed and maintained - Brian
------------------
Building this plugin:
Requires libzmq and libczmq. First, download the tarballs of both libzmq
and its supporting libczmq from http://download.zeromq.org. As of this

View File

@ -3,27 +3,10 @@ CZMQ Output Plugin
REQUIREMENTS:
* libsodium ( https://github.com/jedisct1/libsodium )
* zeromq v4.x build with libsodium support ( http://zeromq.org/ )
* czmq 3.x ( http://czmq.zeromq.org/ )
* zeromq built with libsodium support ( http://zeromq.org/ )
* czmq ( http://czmq.zeromq.org/ )
-------------------------------------------------------------------------------
module(
load="omczmq"
servercertpath="/etc/curve.d/server"
clientcertpath="/etc/curve.d/"
authtype="CURVESERVER"
authenticator="on"
)
action(
name="to_zeromq"
type="omczmq"
endpoints="tcp://*:24445"
socktype="PUSH"
)
-------------------------------------------------------------------------------
Explanation of Options:
EXPLANATION OF OPTIONS
Module
------
@ -36,5 +19,61 @@ Action
------
type: type of action (omczmq for this plugin)
endpoints: comma delimited list of zeromq endpoints (see zeromq documentation)
socktype: zeromq socket type (currently supports PUSH, PUB, DEALER, RADIO, CLIENT)
authtype: CURVECLIENT or CURVESERVER
socktype: zeromq socket type (currently supports PUSH, PUB, DEALER, RADIO, CLIENT, SCATTER)
sendtimeout: timeout in ms before send errors
sendhwm: number of messages to store in internal buffer before discarding (defaults to 1000)
connecttimeout: connection timeout in ms(requires libzmq 4.2 or higher)
heartbeativl: time in ms between sending heartbeat PING messages (requires libzmq 4.2 or higher)
heartbeattimeout: time in milliseconds to wait for a PING response before disconnect(libzmq 4.2 or higher)
heartbeatttl: time remote peer should wait between PINGs before disconnect (libzmq 4.2 or higher)
topicframe: "on" to send topic as separate frame if PUB socket
topics: comma delimited list of topics or templates to make topics from if PUB or RADIO socket
dynatopic: if "on" topics list is treated as list of template names
template: template to use for message (defaults to RSYSLOG_ForwardFormat)
EXAMPLE CONFIGURATION
This configuration sets up an omczmq endpoint as a ZMQ_PUB socket with CURVE authentication.
Clients whose certificates are in the '/etc/curve.d/allowed_clients/' directory will be
allowed to connect. Each message is published on two topics ( "hostname.programname" and
"programname.hostname" ) which are constructed from properties of the log message.
For instance, a log from sshd from host.example.com will be published on two topics:
* host.example.com.sshd
* sshd.host.example.com
In this configuration, the output is configured to send each message as a two frame
message, with the topic in the first flame and the rsyslog message in the second.
-------------------------------------------------------------------------------
module(
load="omczmq"
servercertpath="/etc/curve.d/example_server"
clientcertpath="/etc/curve.d/allowed_clients"
authtype="CURVESERVER"
authenticator="on"
)
template(name="host_program_topic" type="list") {
property(name="hostname")
constant(value=".")
property(name="programname")
}
template(name="program_host_topic" type="list") {
property(name="programname")
constant(value=".")
property(name="hostname")
}
action(
name="to_zeromq"
type="omczmq"
socktype="PUB"
endpoints="@tcp://*:31338"
topics="host_program_topic,program_host_topic"
dynatopic="on"
topicframe="on"
)
-------------------------------------------------------------------------------

View File

@ -79,8 +79,16 @@ typedef struct _instanceData {
bool sendError;
char *sockEndpoints;
int sockType;
int sendHWM;
#if(CZMQ_VERSION_MAJOR >= 4 && ZMQ_VERSION_MAJOR >=4 && ZMQ_VERSION_MINOR >=2)
int heartbeatIvl;
int heartbeatTimeout;
int heartbeatTTL;
int connectTimeout;
#endif
uchar *tplName;
bool topicFrame;
sbool topicFrame;
sbool dynaTopic;
} instanceData;
typedef struct wrkrInstanceData {
@ -90,10 +98,18 @@ typedef struct wrkrInstanceData {
static struct cnfparamdescr actpdescr[] = {
{ "endpoints", eCmdHdlrGetWord, 1 },
{ "socktype", eCmdHdlrGetWord, 1 },
{ "sendhwm", eCmdHdlrGetWord, 0 },
#if(CZMQ_VERSION_MAJOR >= 4 && ZMQ_VERSION_MAJOR >=4 && ZMQ_VERSION_MINOR >=2)
{ "heartbeatttl", eCmdHdlrGetWord, 0},
{ "heartbeativl", eCmdHdlrGetWord, 0},
{ "heartbeattimeout", eCmdHdlrGetWord, 0},
{ "connecttimeout", eCmdHdlrGetWord, 0},
#endif
{ "sendtimeout", eCmdHdlrGetWord, 0 },
{ "template", eCmdHdlrGetWord, 0 },
{ "topics", eCmdHdlrGetWord, 0 },
{ "topicframe", eCmdHdlrGetWord, 0}
{ "topicframe", eCmdHdlrGetWord, 0},
{ "dynatopic", eCmdHdlrBinary, 0 }
};
static struct cnfparamblk actpblk = {
@ -112,8 +128,17 @@ static rsRetVal initCZMQ(instanceData* pData) {
pData->sockEndpoints);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
zsock_set_sndtimeo(pData->sock, pData->sendTimeout);
#if(CZMQ_VERSION_MAJOR >= 4 && ZMQ_VERSION_MAJOR >=4 && ZMQ_VERSION_MINOR >=2)
if(pData->heartbeatIvl > 0 && pData->heartbeatTimeout > 0 && pData->heartbeatTTL > 0) {
zsock_set_heartbeat_ivl(pData->sock, pData->heartbeatIvl);
zsock_set_heartbeat_timeout(pData->sock, pData->heartbeatTimeout);
zsock_set_heartbeat_ttl(pData->sock, pData->heartbeatTTL);
}
#endif
if(runModConf->authType) {
if (!strcmp(runModConf->authType, "CURVESERVER")) {
zcert_t *serverCert = zcert_load(runModConf->serverCertPath);
@ -180,40 +205,65 @@ finalize_it:
RETiRet;
}
rsRetVal outputCZMQ(uchar* msg, instanceData* pData) {
rsRetVal outputCZMQ(uchar** ppString, instanceData* pData) {
DEFiRet;
if(NULL == pData->sock) {
CHKiRet(initCZMQ(pData));
}
/* if we are using a PUB (or RADIO) socket and we have a topic list then we
* need some special care and attention */
#if defined(ZMQ_RADIO)
DBGPRINTF("omczmq: ZMQ_RADIO is defined...\n");
if((pData->sockType == ZMQ_PUB || pData->sockType == ZMQ_RADIO) && pData->topics) {
#else
DBGPRINTF("omczmq: ZMQ_RADIO is NOT defined...\n");
if(pData->sockType == ZMQ_PUB && pData->topics) {
#endif
char *topic = zlist_first(pData->topics);
int templateIndex = 1;
const char *topic = (const char *)zlist_first(pData->topics);
while(topic) {
if(pData->topicFrame && pData->sockType == ZMQ_SUB) {
int rc = zstr_sendx(pData->sock, topic, (char*)msg, NULL);
int rc;
/* if dynaTopic is true, the topic is constructed by rsyslog
* by applying the supplied template to the message properties */
if(pData->dynaTopic)
topic = (const char*)ppString[templateIndex];
if (pData->sockType == ZMQ_PUB) {
/* if topicFrame is true, send the topic as a separate zmq frame */
if(pData->topicFrame) {
rc = zstr_sendx(pData->sock, topic, (char*)ppString[0], NULL);
}
/* if topicFrame is false, concatenate the topic with the
* message in the same frame */
else {
rc = zstr_sendf(pData->sock, "%s%s", topic, (char*)ppString[0]);
}
/* if we have a send error notify rsyslog */
if(rc != 0) {
pData->sendError = true;
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
}
}
#if defined(ZMQ_RADIO)
else if(pData->sockType == ZMQ_RADIO) {
zframe_t *frame = zframe_from((char*)msg);
if(!frame) {
DBGPRINTF("omczmq: sending on RADIO socket...\n");
zframe_t *frame = zframe_from((char*)ppString[0]);
if (!frame) {
DBGPRINTF("omczmq: failed to create frame...\n");
pData->sendError = true;
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
int rc = zframe_set_group(frame, topic);
if(rc != 0) {
rc = zframe_set_group(frame, topic);
if (rc != 0) {
DBGPRINTF("omczmq: failed to set group '%d'...\n", rc);
pData->sendError = true;
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
DBGPRINTF("omczmq: set RADIO group to '%s'\n", topic);
rc = zframe_send(&frame, pData->sock, 0);
if(rc != 0) {
pData->sendError = true;
@ -221,22 +271,21 @@ rsRetVal outputCZMQ(uchar* msg, instanceData* pData) {
}
}
#endif
else {
int rc = zstr_sendf(pData->sock, "%s%s", topic, (char*)msg);
if(rc != 0) {
pData->sendError = true;
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
}
/* get the next topic from the list, and increment
* our topic index */
topic = zlist_next(pData->topics);
templateIndex++;
}
}
/* we aren't a PUB socket and we don't have a topic list - this means
* we can just send the message using the rsyslog template */
else {
int rc = zstr_send(pData->sock, (char*)msg);
int rc = zstr_send(pData->sock, (char*)ppString[0]);
if(rc != 0) {
pData->sendError = true;
DBGPRINTF("imczmq send error: %d", rc);
DBGPRINTF("omczmq: send error: %d", rc);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
}
@ -254,7 +303,12 @@ setInstParamDefaults(instanceData* pData) {
pData->sockType = -1;
pData->sendTimeout = -1;
pData->topics = NULL;
pData->topicFrame = true;
pData->topicFrame = false;
#if(CZMQ_VERSION_MAJOR >= 4 && ZMQ_VERSION_MAJOR >=4 && ZMQ_VERSION_MINOR >=2)
pData->heartbeatIvl = 0;
pData->heartbeatTimeout = 0;
pData->heartbeatTTL = 0;
#endif
}
@ -319,7 +373,7 @@ CODESTARTactivateCnf
runModConf = pModConf;
if(runModConf->authenticator == 1) {
if(!authActor) {
DBGPRINTF("imczmq: starting authActor\n");
DBGPRINTF("omczmq: starting authActor\n");
authActor = zactor_new(zauth, NULL);
if(!strcmp(runModConf->clientCertPath, "*")) {
zstr_sendx(authActor, "CURVE", CURVE_ALLOW_ANY, NULL);
@ -338,7 +392,7 @@ CODESTARTfreeCnf
free(pModConf->authType);
free(pModConf->serverCertPath);
free(pModConf->clientCertPath);
DBGPRINTF("imczmq: stopping authActor\n");
DBGPRINTF("omczmq: stopping authActor\n");
zactor_destroy(&authActor);
ENDfreeCnf
@ -354,7 +408,7 @@ CODESTARTsetModCnf
for (i=0; i<modpblk.nParams; ++i) {
if(!pvals[i].bUsed) {
DBGPRINTF("imczmq: pvals[i].bUSed continuing\n");
DBGPRINTF("omczmq: pvals[i].bUSed continuing\n");
continue;
}
if(!strcmp(modpblk.descr[i].name, "authenticator")) {
@ -374,7 +428,7 @@ CODESTARTsetModCnf
}
else {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"imczmq: config error, unknown "
"omczmq: config error, unknown "
"param %s in setModCnf\n",
modpblk.descr[i].name);
}
@ -401,7 +455,7 @@ BEGINdoAction
CODESTARTdoAction
pthread_mutex_lock(&mutDoAct);
pData = pWrkrData->pData;
iRet = outputCZMQ(ppString[0], pData);
iRet = outputCZMQ(ppString, pData);
pthread_mutex_unlock(&mutDoAct);
ENDdoAction
@ -409,6 +463,7 @@ ENDdoAction
BEGINnewActInst
struct cnfparamvals *pvals;
int i;
int iNumTpls;
CODESTARTnewActInst
if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
@ -417,8 +472,6 @@ CODESTARTnewActInst
CHKiRet(createInstance(&pData));
setInstParamDefaults(pData);
CODE_STD_STRING_REQUESTnewActInst(1)
for(i = 0; i < actpblk.nParams; ++i) {
if(!pvals[i].bUsed) {
continue;
@ -426,38 +479,69 @@ CODESTARTnewActInst
if(!strcmp(actpblk.descr[i].name, "endpoints")) {
pData->sockEndpoints = es_str2cstr(pvals[i].val.d.estr, NULL);
DBGPRINTF("omczmq: sockEndPoints set to '%s'\n", pData->sockEndpoints);
}
else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
DBGPRINTF("omczmq: template set to '%s'\n", pData->tplName);
}
else if(!strcmp(actpblk.descr[i].name, "dynatopic")) {
pData->dynaTopic = pvals[i].val.d.n;
DBGPRINTF("omczmq: dynaTopic set to %s\n", pData->dynaTopic ? "true" : "false");
}
else if(!strcmp(actpblk.descr[i].name, "sendtimeout")) {
pData->sendTimeout = atoi(es_str2cstr(pvals[i].val.d.estr, NULL));
DBGPRINTF("omczmq: sendTimeout set to %d\n", pData->sendTimeout);
}
else if(!strcmp(actpblk.descr[i].name, "sendhwm")) {
pData->sendTimeout = atoi(es_str2cstr(pvals[i].val.d.estr, NULL));
DBGPRINTF("omczmq: sendHWM set to %d\n", pData->sendHWM);
}
#if (CZMQ_VERSION_MAJOR >= 4 && ZMQ_VERSION_MAJOR >=4 && ZMQ_VERSION_MINOR >=2)
else if(!strcmp(actpblk.descr[i].name, "heartbeativl")) {
pData->heartbeatIvl = atoi(es_str2cstr(pvals[i].val.d.estr, NULL));
DBGPRINTF("omczmq: heartbeatbeatIvl set to %d\n", pData->heartbeatIvl);
}
else if(!strcmp(actpblk.descr[i].name, "heartbeattimeout")) {
pData->heartbeatTimeout = atoi(es_str2cstr(pvals[i].val.d.estr, NULL));
DBGPRINTF("omczmq: heartbeatTimeout set to %d\n", pData->heartbeatTimeout);
}
else if(!strcmp(actpblk.descr[i].name, "heartbeatttl")) {
pData->heartbeatTimeout = atoi(es_str2cstr(pvals[i].val.d.estr, NULL));
DBGPRINTF("omczmq: heartbeatTTL set to %d\n", pData->heartbeatTTL);
}
#endif
else if(!strcmp(actpblk.descr[i].name, "socktype")){
char *stringType = es_str2cstr(pvals[i].val.d.estr, NULL);
if(stringType != NULL){
if(!strcmp("PUB", stringType)) {
pData->sockType = ZMQ_PUB;
DBGPRINTF("omczmq: sockType set to ZMQ_PUB\n");
}
#if defined(ZMQ_RADIO)
else if(!strcmp("RADIO", stringType)) {
pData->sockType = ZMQ_RADIO;
DBGPRINTF("omczmq: sockType set to ZMQ_RADIO\n");
}
#endif
else if(!strcmp("PUSH", stringType)) {
pData->sockType = ZMQ_PUSH;
DBGPRINTF("omczmq: sockType set to ZMQ_PUSH\n");
}
#if defined(ZMQ_SCATTER)
else if(!strcmp("SCATTER", stringType)) {
pData->sockType = ZMQ_SCATTER;
DBGPRINTF("omczmq: sockType set to ZMQ_SCATTER\n");
}
#endif
else if(!strcmp("DEALER", stringType)) {
pData->sockType = ZMQ_DEALER;
DBGPRINTF("omczmq: sockType set to ZMQ_DEALER\n");
}
#if defined(ZMQ_CLIENT)
else if(!strcmp("CLIENT", stringType)) {
pData->sockType = ZMQ_DEALER;
pData->sockType = ZMQ_CLIENT;
DBGPRINTF("omczmq: sockType set to ZMQ_CLIENT\n");
}
#endif
free(stringType);
@ -469,17 +553,13 @@ CODESTARTnewActInst
}
}
else if(!strcmp(actpblk.descr[i].name, "topicframe")) {
int tframe = atoi(es_str2cstr(pvals[i].val.d.estr, NULL));
if(tframe == 1) {
pData->topicFrame = true;
}
else {
pData->topicFrame = false;
}
pData->topicFrame = pvals[i].val.d.n;
DBGPRINTF("omczmq: topicFrame set to %s\n", pData->topicFrame ? "true" : "false");
}
else if(!strcmp(actpblk.descr[i].name, "topics")) {
pData->topics = zlist_new();
char *topics = es_str2cstr(pvals[i].val.d.estr, NULL);
DBGPRINTF("omczmq: topics set to %s\n", topics);
char *topics_org = topics;
char topic[256];
if(topics == NULL){
@ -513,6 +593,12 @@ CODESTARTnewActInst
}
}
iNumTpls = 1;
if (pData->dynaTopic) {
iNumTpls = zlist_size (pData->topics) + iNumTpls;
}
CODE_STD_STRING_REQUESTnewActInst(iNumTpls)
if (pData->tplName == NULL) {
CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup("RSYSLOG_ForwardFormat"),
OMSR_NO_RQD_TPL_OPTS));
@ -521,6 +607,16 @@ CODESTARTnewActInst
CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS));
}
i = 1;
if (pData->dynaTopic) {
char *topic = zlist_first(pData->topics);
while (topic) {
CHKiRet(OMSRsetEntry(*ppOMSR, i, (uchar*)strdup(topic), OMSR_NO_RQD_TPL_OPTS));
i++;
topic = zlist_next(pData->topics);
}
}
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst

View File

@ -1,5 +1,11 @@
ZeroMQ 3.x Output Plugin
DEPRECATION NOTICE
------------------
This plugin is not maintained and is deprecated. For ZeroMQ output support,
please use contrib/omczmq, which is actively developed and maintained - Brian
------------------
Building this plugin:
Requires libzmq and libczmq. First, download the tarballs of both libzmq
and its supporting libczmq from http://download.zeromq.org. As of this