Allow enabling AMQP heartbeat and it's interval

This commit is contained in:
21stcaveman 2022-08-08 15:00:41 -07:00
parent ac75618be3
commit 7209ff1a4b

View File

@ -4,6 +4,7 @@
*
* Copyright 2012-2013 Vaclav Tomec
* Copyright 2014 Rainer Gerhards
* Copyright 2022 Hamid Maadani
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@ -22,7 +23,7 @@
* Author: Vaclav Tomec
* <vaclav.tomec@gmail.com>
*
* TLS Support added by:
* TLS & AMQP heartbeat support added by:
* Hamid Maadani
* <hamid@dexo.tech>
*
@ -140,6 +141,7 @@ typedef struct _instanceData {
int initOpenSSL; /* should rabbitmq-c initialize OpenSSL? */
int verifyPeer; /* should peer be verified for TLS? */
int verifyHostname; /* should hostname be verified for TLS? */
int heartbeat; /* AMQP heartbeat interval in seconds (0 means disabled, which is default) */
char *caCert; /* CA certificate to be used for TLS connection */
recover_t recover_policy;
@ -186,6 +188,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "host", eCmdHdlrString, 0 },
{ "port", eCmdHdlrInt, 0 },
{ "virtual_host", eCmdHdlrGetWord, 0 },
{ "heartbeat_interval", eCmdHdlrNonNegInt, 0 },
{ "user", eCmdHdlrGetWord, 0 },
{ "password", eCmdHdlrGetWord, 0 },
{ "ssl", eCmdHdlrBinary, 0 },
@ -274,7 +277,7 @@ static int amqp_authenticate(wrkrInstanceData_t *self, amqp_connection_state_t a
int frame_size = (glbl.GetMaxLine(runConf)<130000) ? 131072 : (glbl.GetMaxLine(runConf)+1072);
/* authenticate */
ret = amqp_login(a_conn, (char const *)self->pData->vhost, 1, frame_size, 0,
ret = amqp_login(a_conn, (char const *)self->pData->vhost, 1, frame_size, self->pData->heartbeat,
AMQP_SASL_METHOD_PLAIN, self->pData->user, self->pData->password);
if (ret.reply_type != AMQP_RESPONSE_NORMAL)
@ -973,6 +976,7 @@ CODESTARTcreateInstance
pData->verifyPeer = 0;
pData->verifyHostname = 0;
pData->caCert = NULL;
pData->heartbeat = 0;
ENDcreateInstance
BEGINfreeInstance
@ -1035,6 +1039,7 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tverify_peer=%d\n", pData->verifyPeer);
dbgprintf("\tverify_hostname=%d\n", pData->verifyHostname);
dbgprintf("\tca_cert='%s'\n", pData->caCert);
dbgprintf("\theartbeat_interval=%d\n", pData->heartbeat);
dbgprintf("\texchange='%*s'\n", (int)pData->exchange.len,
(char*)pData->exchange.bytes);
@ -1090,6 +1095,8 @@ CODESTARTnewActInst
pData->ssl = (int) pvals[i].val.d.n;
} else if (!strcmp(actpblk.descr[i].name, "ca_cert")) {
pData->caCert = (char*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if (!strcmp(actpblk.descr[i].name, "heartbeat_interval")) {
pData->heartbeat = (int) pvals[i].val.d.n;
} else if (!strcmp(actpblk.descr[i].name, "init_openssl")) {
pData->initOpenSSL = (int) pvals[i].val.d.n;
} else if (!strcmp(actpblk.descr[i].name, "verify_peer")) {