mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-17 18:40:42 +01:00
Merge pull request #2091 from allanpark/Issue-2090-lmsig_ksils12_libksi_async_mode
Issue 2090: Add possibility to use the asyncronous mode of the libksi (>=3.16)
This commit is contained in:
commit
03d99f1238
@ -1300,7 +1300,7 @@ AC_ARG_ENABLE(ksi-ls12,
|
||||
[enable_ksi_ls12=no]
|
||||
)
|
||||
if test "x$enable_ksi_ls12" = "xyes"; then
|
||||
PKG_CHECK_MODULES(GT_KSI_LS12, libksi >= 3.13.0)
|
||||
PKG_CHECK_MODULES(GT_KSI_LS12, libksi >= 3.16.0)
|
||||
fi
|
||||
AM_CONDITIONAL(ENABLE_KSI_LS12, test x$enable_ksi_ls12 = xyes)
|
||||
|
||||
|
||||
@ -73,6 +73,15 @@ size_t RingBuffer_count(RingBuffer* this) {
|
||||
return this->count;
|
||||
}
|
||||
|
||||
bool RingBuffer_getItem(RingBuffer* this, size_t index, void** item) {
|
||||
if (this->count == 0 || index >= this->count)
|
||||
return false;
|
||||
|
||||
*item = this->buffer[(this->head + index) % this->size];
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
ProtectedQueue* ProtectedQueue_new(size_t queueSize) {
|
||||
ProtectedQueue *p = calloc(1, sizeof (ProtectedQueue));
|
||||
if (!p)
|
||||
@ -89,6 +98,7 @@ void ProtectedQueue_free(ProtectedQueue* this) {
|
||||
pthread_cond_destroy(&this->condition);
|
||||
this->bStop = true;
|
||||
RingBuffer_free(this->workItems);
|
||||
free(this);
|
||||
}
|
||||
|
||||
/// Signal stop. All threads waiting in FetchItme will be returned false from FetchItem
|
||||
@ -139,6 +149,14 @@ size_t ProtectedQueue_popFrontBatch(ProtectedQueue* this, void** items, size_t b
|
||||
return i;
|
||||
}
|
||||
|
||||
bool ProtectedQueue_getItem(ProtectedQueue* this, size_t index, void** item) {
|
||||
bool ret=false;
|
||||
pthread_mutex_lock(&this->mutex);
|
||||
ret=RingBuffer_getItem(this->workItems, index, item);
|
||||
pthread_mutex_unlock(&this->mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Waits for a new work item or timeout (if specified). Returns 0 in case of exit
|
||||
condition, 1 if item became available and ETIMEDOUT in case of timeout. */
|
||||
int ProtectedQueue_waitForItem(ProtectedQueue* this, void** item, uint64_t timeout) {
|
||||
@ -151,19 +169,16 @@ int ProtectedQueue_waitForItem(ProtectedQueue* this, void** item, uint64_t timeo
|
||||
ts.tv_nsec += (timeout % 1000LL)*1000LL;
|
||||
}
|
||||
|
||||
while (RingBuffer_count(this->workItems) == 0) {
|
||||
if (timeout) {
|
||||
if (pthread_cond_timedwait(&this->condition, &this->mutex, &ts) == ETIMEDOUT) {
|
||||
pthread_mutex_unlock(&this->mutex);
|
||||
return ETIMEDOUT;
|
||||
}
|
||||
} else
|
||||
pthread_cond_wait(&this->condition, &this->mutex);
|
||||
|
||||
if (this->bStop) {
|
||||
if (timeout) {
|
||||
if (pthread_cond_timedwait(&this->condition, &this->mutex, &ts) == ETIMEDOUT) {
|
||||
pthread_mutex_unlock(&this->mutex);
|
||||
return 0;
|
||||
return ETIMEDOUT;
|
||||
}
|
||||
} else
|
||||
pthread_cond_wait(&this->condition, &this->mutex);
|
||||
if (this->bStop) {
|
||||
pthread_mutex_unlock(&this->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (RingBuffer_count(this->workItems) != 0 && item != NULL)
|
||||
|
||||
@ -20,6 +20,7 @@ void RingBuffer_free(RingBuffer* this);
|
||||
bool RingBuffer_pushBack(RingBuffer* this, void* item);
|
||||
bool RingBuffer_popFront(RingBuffer* this, void** item);
|
||||
bool RingBuffer_peekFront(RingBuffer* this, void** item);
|
||||
bool RingBuffer_getItem(RingBuffer* this, size_t index, void** item);
|
||||
size_t RingBuffer_count(RingBuffer* this);
|
||||
|
||||
typedef struct ProtectedQueue_st {
|
||||
@ -38,6 +39,7 @@ bool ProtectedQueue_popFront(ProtectedQueue* this, void** item);
|
||||
size_t ProtectedQueue_popFrontBatch(ProtectedQueue* this, void** items, size_t bufSize);
|
||||
int ProtectedQueue_waitForItem(ProtectedQueue* this, void** item, uint64_t timeout);
|
||||
size_t ProtectedQueue_count(ProtectedQueue* this);
|
||||
bool ProtectedQueue_getItem(ProtectedQueue* this, size_t index, void** item);
|
||||
|
||||
typedef struct WorkerThreadContext_st {
|
||||
bool (*workerFunc)(void*);
|
||||
|
||||
@ -55,6 +55,9 @@
|
||||
#include <ksi/ksi.h>
|
||||
#include <ksi/tlv_element.h>
|
||||
#include <ksi/hash.h>
|
||||
#include <ksi/net_async.h>
|
||||
#include <ksi/net_uri.h>
|
||||
#include <ksi/signature_builder.h>
|
||||
#include "lib_ksils12.h"
|
||||
#include "lib_ksi_queue.h"
|
||||
|
||||
@ -65,6 +68,8 @@ typedef unsigned char uchar;
|
||||
|
||||
#define KSI_BUF_SIZE 4096
|
||||
|
||||
#define debug_report(ctx, args...) do { if(ctx->debug>0) report(ctx, args); } while(0)
|
||||
|
||||
static const char *blockFileSuffix = ".logsig.parts/blocks.dat";
|
||||
static const char *sigFileSuffix = ".logsig.parts/block-signatures.dat";
|
||||
static const char *ls12FileSuffix = ".logsig";
|
||||
@ -74,6 +79,7 @@ static const char *blockCloseReason = "com.guardtime.blockCloseReason";
|
||||
#define LS12_FILE_HEADER "LOGSIG12"
|
||||
#define LS12_BLOCKFILE_HEADER "LOG12BLK"
|
||||
#define LS12_SIGFILE_HEADER "LOG12SIG"
|
||||
#define LS12_SIGNATURE_TIMEOUT 60
|
||||
|
||||
/* Worker queue item type identifier */
|
||||
typedef enum QITEM_type_en {
|
||||
@ -83,12 +89,24 @@ typedef enum QITEM_type_en {
|
||||
QITEM_QUIT
|
||||
} QITEM_type;
|
||||
|
||||
/* Worker queue item status identifier */
|
||||
typedef enum QITEM_status_en {
|
||||
QITEM_WAITING = 0x00,
|
||||
QITEM_SENT,
|
||||
QITEM_DONE,
|
||||
} QITEM_status;
|
||||
|
||||
|
||||
/* Worker queue job item */
|
||||
typedef struct QueueItem_st {
|
||||
QITEM_type type;
|
||||
QITEM_status status;
|
||||
void *arg;
|
||||
uint64_t intarg1;
|
||||
uint64_t intarg2;
|
||||
KSI_AsyncHandle *respHandle;
|
||||
int ksi_status;
|
||||
time_t request_time;
|
||||
} QueueItem;
|
||||
|
||||
bool add_queue_item(rsksictx ctx, QITEM_type type, void *arg, uint64_t intarg1, uint64_t intarg2);
|
||||
@ -628,6 +646,7 @@ rsksiCtxNew(void) {
|
||||
ctx->blockTimeLimit = 0;
|
||||
ctx->bKeepTreeHashes = false;
|
||||
ctx->bKeepRecordHashes = true;
|
||||
ctx->max_requests = (1 << 8);
|
||||
ctx->errFunc = NULL;
|
||||
ctx->usrptr = NULL;
|
||||
ctx->fileUID = -1;
|
||||
@ -640,6 +659,8 @@ rsksiCtxNew(void) {
|
||||
ctx->thread_started = false;
|
||||
ctx->disabled = false;
|
||||
|
||||
debug_report(ctx, "debug: signer plugin loaded");
|
||||
|
||||
/*if (pthread_mutex_init(&ctx->module_lock, 0))
|
||||
report(ctx, "pthread_mutex_init: %s", strerror(errno));
|
||||
ctx->signer_queue = ProtectedQueue_new(10);*/
|
||||
@ -653,6 +674,50 @@ rsksiCtxNew(void) {
|
||||
return ctx;
|
||||
}
|
||||
|
||||
int
|
||||
rsksiInitModule(rsksictx ctx) {
|
||||
int res = 0;
|
||||
KSI_Config *config = NULL;
|
||||
KSI_Integer *ksi_int = NULL;
|
||||
uint64_t tmpInt;
|
||||
|
||||
KSI_CTX_setOption(ctx->ksi_ctx, KSI_OPT_AGGR_HMAC_ALGORITHM, (void*)((size_t)ctx->hmacAlg));
|
||||
|
||||
res = KSI_receiveAggregatorConfig(ctx->ksi_ctx, &config);
|
||||
if(res == KSI_OK) {
|
||||
if (KSI_Config_getMaxRequests(config, &ksi_int) == KSI_OK && ksi_int != NULL) {
|
||||
tmpInt = KSI_Integer_getUInt64(ksi_int);
|
||||
ctx->max_requests=tmpInt;
|
||||
report(ctx, "KSI gateway has reported a max requests value of %lu", tmpInt);
|
||||
}
|
||||
|
||||
ksi_int = NULL;
|
||||
if(KSI_Config_getMaxLevel(config, &ksi_int) == KSI_OK && ksi_int != NULL) {
|
||||
tmpInt = KSI_Integer_getUInt64(ksi_int);
|
||||
report(ctx, "KSI gateway has reported a max level value of %lu", tmpInt);
|
||||
if(ctx->blockLevelLimit > tmpInt) {
|
||||
report(ctx, "Decreasing the configured block level limit from %lu to %lu reported by KSI gateway", ctx->blockLevelLimit, tmpInt);
|
||||
ctx->blockLevelLimit=tmpInt;
|
||||
}
|
||||
else if(tmpInt < 2) {
|
||||
report(ctx, "KSI gateway has reported an invalid level limit value (%lu), plugin disabled", tmpInt);
|
||||
ctx->disabled = true;
|
||||
res = KSI_INVALID_ARGUMENT;
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
reportKSIAPIErr(ctx, NULL, "KSI_receiveAggregatorConfig", res);
|
||||
}
|
||||
|
||||
create_signer_thread(ctx);
|
||||
|
||||
done:
|
||||
KSI_Config_free(config);
|
||||
return res;
|
||||
}
|
||||
|
||||
/* either returns ksifile object or NULL if something went wrong */
|
||||
ksifile
|
||||
rsksiCtxOpenFile(rsksictx ctx, unsigned char *logfn)
|
||||
@ -668,7 +733,7 @@ rsksiCtxOpenFile(rsksictx ctx, unsigned char *logfn)
|
||||
/* The thread cannot be be created in rsksiCtxNew because in daemon mode the
|
||||
process forks after rsksiCtxNew and the thread disappears */
|
||||
if (!ctx->thread_started)
|
||||
create_signer_thread(ctx);
|
||||
rsksiInitModule(ctx);
|
||||
|
||||
if ((ksi = rsksifileConstruct(ctx)) == NULL)
|
||||
goto done;
|
||||
@ -728,6 +793,18 @@ rsksiSetHashFunction(rsksictx ctx, char *algName) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
rsksiSetHmacFunction(rsksictx ctx, char *algName) {
|
||||
int id = KSI_getHashAlgorithmByName(algName);
|
||||
if (id == KSI_HASHALG_INVALID) {
|
||||
report(ctx, "HMAC function '%s' unknown - using default", algName);
|
||||
ctx->hmacAlg = KSI_HASHALG_SHA2_256;
|
||||
} else {
|
||||
ctx->hmacAlg = id;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
rsksifileDestruct(ksifile ksi) {
|
||||
int r = 0;
|
||||
@ -748,7 +825,6 @@ rsksifileDestruct(ksifile ksi) {
|
||||
free(ksi->blockfilename);
|
||||
free(ksi->statefilename);
|
||||
free(ksi->ksifilename);
|
||||
free(ksi->IV);
|
||||
ctx->ksi = NULL;
|
||||
free(ksi);
|
||||
|
||||
@ -1068,12 +1144,14 @@ sigblkFinishKSI(ksifile ksi)
|
||||
KSI_DataHash_free(ksi->roots[j]);
|
||||
ksi->roots[j] = NULL;
|
||||
KSI_DataHash_free(rootDel);
|
||||
if(ksi->bKeepTreeHashes)
|
||||
tlvWriteHashKSI(ksi, 0x0903, root);
|
||||
if(ret != 0) goto done; /* checks hash_node_ksi() result! */
|
||||
}
|
||||
}
|
||||
|
||||
//Multiplying leaves count by 2 to account for blinding masks
|
||||
level=sigblkCalcLevel(2 * ksi->nRecords);
|
||||
//Multiplying leaves count by 2 to account for blinding masks
|
||||
level=sigblkCalcLevel(2 * ksi->nRecords);
|
||||
|
||||
//in case of async mode we append the root hash to signer queue
|
||||
if (ksi->ctx->syncMode == LOGSIG_ASYNCHRONOUS) {
|
||||
@ -1090,6 +1168,8 @@ sigblkFinishKSI(ksifile ksi)
|
||||
}
|
||||
|
||||
done:
|
||||
free(ksi->IV);
|
||||
ksi->IV=NULL;
|
||||
ksi->bInBlk = 0;
|
||||
return ret;
|
||||
}
|
||||
@ -1103,7 +1183,6 @@ rsksiSetAggregator(rsksictx ctx, char *uri, char *loginid, char *key) {
|
||||
return KSI_INVALID_ARGUMENT;
|
||||
}
|
||||
|
||||
|
||||
r = KSI_CTX_setAggregator(ctx->ksi_ctx, uri, loginid, key);
|
||||
if(r != KSI_OK) {
|
||||
ctx->disabled = true;
|
||||
@ -1127,8 +1206,12 @@ bool add_queue_item(rsksictx ctx, QITEM_type type, void *arg, uint64_t intarg1,
|
||||
|
||||
qi->arg = arg;
|
||||
qi->type = type;
|
||||
qi->status = QITEM_WAITING;
|
||||
qi->intarg1 = intarg1;
|
||||
qi->intarg2 = intarg2;
|
||||
qi->respHandle = NULL;
|
||||
qi->ksi_status = KSI_UNKNOWN_ERROR;
|
||||
qi->request_time = time(NULL);
|
||||
if (ProtectedQueue_addItem(ctx->signer_queue, qi) == false) {
|
||||
ctx->disabled = true;
|
||||
free(qi);
|
||||
@ -1153,7 +1236,12 @@ static void process_requests(rsksictx ctx, KSI_CTX *ksi_ctx, FILE* outfile) {
|
||||
tlvWriteNoSigLS12(outfile, lastItem->intarg1, lastItem->arg,
|
||||
"Signature request dropped for block, signer queue full");
|
||||
report(ctx, "Signature request dropped for block, signer queue full");
|
||||
|
||||
/* the main thread has to be locked when the hash is freed to avoid a race condition */
|
||||
/* TODO: this need more elegant solution, hash should be detached from creation context*/
|
||||
pthread_mutex_lock(&ctx->module_lock);
|
||||
KSI_DataHash_free(lastItem->arg);
|
||||
pthread_mutex_unlock(&ctx->module_lock);
|
||||
free(lastItem);
|
||||
}
|
||||
|
||||
@ -1191,7 +1279,11 @@ signing_failed:
|
||||
reportKSIAPIErr(ctx, NULL, "tlvWriteKSISigLS12", r);
|
||||
|
||||
if (lastItem != NULL) {
|
||||
/* the main thread has to be locked when the hash is freed to avoid a race condition */
|
||||
/* TODO: this need more elegant solution, hash should be detached from creation context*/
|
||||
pthread_mutex_lock(&ctx->module_lock);
|
||||
KSI_DataHash_free(lastItem->arg);
|
||||
pthread_mutex_unlock(&ctx->module_lock);
|
||||
free(lastItem);
|
||||
}
|
||||
|
||||
@ -1201,6 +1293,169 @@ signing_failed:
|
||||
KSI_free(der);
|
||||
}
|
||||
|
||||
static bool save_response(rsksictx ctx, FILE* outfile, QueueItem *item) {
|
||||
bool ret = false;
|
||||
KSI_Signature *sig = NULL;
|
||||
unsigned char *raw = NULL;
|
||||
size_t raw_len;
|
||||
int res = KSI_OK;
|
||||
|
||||
if(item->respHandle != NULL && item->ksi_status == KSI_OK) {
|
||||
CHECK_KSI_API(KSI_AsyncHandle_getSignature(item->respHandle, &sig), ctx, "KSI_AsyncHandle_getSignature");
|
||||
CHECK_KSI_API(KSI_Signature_serialize(sig, &raw, &raw_len), ctx,
|
||||
"KSI_Signature_serialize");
|
||||
tlvWriteKSISigLS12(outfile, item->intarg1, raw, raw_len);
|
||||
KSI_free(raw);
|
||||
}
|
||||
else {
|
||||
tlvWriteNoSigLS12(outfile, item->intarg1, item->arg, KSI_getErrorString(item->ksi_status));
|
||||
}
|
||||
ret = true;
|
||||
|
||||
cleanup:
|
||||
if(res != KSI_OK)
|
||||
tlvWriteNoSigLS12(outfile, item->intarg1, item->arg, KSI_getErrorString(res));
|
||||
|
||||
KSI_Signature_free(sig);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static bool process_requests_async(rsksictx ctx, KSI_CTX *ksi_ctx, KSI_AsyncService *as, FILE* outfile) {
|
||||
bool ret = false;
|
||||
QueueItem *item = NULL;
|
||||
int res = KSI_OK, tmpRes;
|
||||
KSI_AsyncHandle *reqHandle = NULL;
|
||||
KSI_AsyncHandle *respHandle = NULL;
|
||||
KSI_AggregationReq *req = NULL;
|
||||
KSI_Integer *level;
|
||||
int state;
|
||||
unsigned i;
|
||||
size_t p;
|
||||
|
||||
KSI_AsyncService_getPendingCount(as, &p);
|
||||
debug_report(ctx, "debug: entering, pending: %zu", p);
|
||||
|
||||
/* Check if there are pending/available responses and associate them with the request items */
|
||||
while(true) {
|
||||
respHandle = NULL;
|
||||
tmpRes=KSI_AsyncService_run(as, &respHandle, &p);
|
||||
if(tmpRes!=KSI_OK)
|
||||
reportKSIAPIErr(ctx, NULL, "KSI_AsyncService_run", tmpRes);
|
||||
|
||||
if (respHandle == NULL) { /* nothing received */
|
||||
debug_report(ctx, "debug: dispatch returned NULL, p = %zu", p);
|
||||
break;
|
||||
}
|
||||
|
||||
state = KSI_ASYNC_STATE_UNDEFINED;
|
||||
|
||||
CHECK_KSI_API(KSI_AsyncHandle_getState(respHandle, &state), ctx, "KSI_AsyncHandle_getState");
|
||||
CHECK_KSI_API(KSI_AsyncHandle_getRequestCtx(respHandle, (const void**)&item), ctx, "KSI_AsyncHandle_getRequestCtx");
|
||||
|
||||
if(item == NULL) { /* must never happen */
|
||||
reportErr(ctx, "KSI_AsyncHandle_getRequestCtx returned NULL as context");
|
||||
continue;
|
||||
}
|
||||
|
||||
if(state == KSI_ASYNC_STATE_RESPONSE_RECEIVED) {
|
||||
item->respHandle = respHandle;
|
||||
item->ksi_status = KSI_OK;
|
||||
}
|
||||
else {
|
||||
KSI_AsyncHandle_getError(respHandle, &item->ksi_status);
|
||||
KSI_AsyncHandle_free(respHandle);
|
||||
}
|
||||
|
||||
debug_report(ctx, "debug: pending_requests: %zu", p);
|
||||
item->status = QITEM_DONE;
|
||||
}
|
||||
|
||||
KSI_AsyncService_getPendingCount(as, &p);
|
||||
|
||||
/* Send all the new requests in the back of the queue to the server */
|
||||
for(i = 0; i < ProtectedQueue_count(ctx->signer_queue); i++) {
|
||||
item = NULL;
|
||||
if(!ProtectedQueue_getItem(ctx->signer_queue, i, (void**)&item) || !item)
|
||||
continue;
|
||||
|
||||
/* ingore non request queue items */
|
||||
if(item->type != QITEM_SIGNATURE_REQUEST)
|
||||
continue;
|
||||
|
||||
/* stop at first processed item */
|
||||
if(item->status != QITEM_WAITING)
|
||||
continue;
|
||||
|
||||
debug_report(ctx, "debug: sending: %u", i);
|
||||
|
||||
CHECK_KSI_API(KSI_AggregationReq_new(ksi_ctx, &req), ctx, "KSI_AggregationReq_new");
|
||||
CHECK_KSI_API(KSI_AggregationReq_setRequestHash((KSI_AggregationReq*)req, KSI_DataHash_ref((KSI_DataHash*)item->arg)), ctx, "KSI_AggregationReq_setRequestHash");
|
||||
CHECK_KSI_API(KSI_Integer_new(ksi_ctx, item->intarg2, &level), ctx, "KSI_Integer_new");
|
||||
CHECK_KSI_API(KSI_AggregationReq_setRequestLevel(req, level), ctx, "KSI_AggregationReq_setRequestLevel");
|
||||
CHECK_KSI_API(KSI_AsyncAggregationHandle_new(ksi_ctx, req, &reqHandle), ctx, "KSI_AsyncAggregationHandle_new");
|
||||
CHECK_KSI_API(KSI_AsyncHandle_setRequestCtx(reqHandle, (void*)item, NULL), ctx, "KSI_AsyncRequest_setRequestContext");
|
||||
state = KSI_AsyncService_addRequest(as, reqHandle); /* this can fail because of throttling */
|
||||
|
||||
if (state == KSI_OK) {
|
||||
debug_report(ctx, "debug: sent: %u", i);
|
||||
item->status = QITEM_SENT;
|
||||
|
||||
tmpRes=KSI_AsyncService_run(as, NULL, NULL);
|
||||
if(tmpRes!=KSI_OK)
|
||||
reportKSIAPIErr(ctx, NULL, "KSI_AsyncService_run", tmpRes);
|
||||
|
||||
} else {
|
||||
KSI_AsyncHandle_free(reqHandle);
|
||||
debug_report(ctx, "debug: Unable to add request, err=%d", item->ksi_status);
|
||||
item->status = QITEM_DONE;
|
||||
item->ksi_status = state;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
debug_report(ctx, "debug: queue size %zu", ProtectedQueue_count(ctx->signer_queue));
|
||||
|
||||
/* Save all consequent fulfilled responses in the front of the queue to the signature file */
|
||||
while(ProtectedQueue_count(ctx->signer_queue)) {
|
||||
item = NULL;
|
||||
if(!ProtectedQueue_getItem(ctx->signer_queue, 0, (void**)&item))
|
||||
break;
|
||||
|
||||
if(!item) {
|
||||
ProtectedQueue_popFront(ctx->signer_queue, (void**) &item);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* stop at first non request queue item (maybe file close/open, quit) */
|
||||
if(item->type!=QITEM_SIGNATURE_REQUEST)
|
||||
break;
|
||||
|
||||
/* stop at first unfinished queue item because the signatures need to be ordered */
|
||||
if(item->status != QITEM_DONE)
|
||||
break;
|
||||
|
||||
//debug_report(ctx, "debug: saving %u, status %d", (unsigned)item->handle, item->ksi_status);
|
||||
ProtectedQueue_popFront(ctx->signer_queue, (void**) &item);
|
||||
save_response(ctx, outfile, item);
|
||||
|
||||
/* the main thread has to be locked when the hash is freed to avoid a race condition */
|
||||
/* TODO: this need more elegant solution, hash should be detached from creation context*/
|
||||
pthread_mutex_lock(&ctx->module_lock);
|
||||
KSI_DataHash_free(item->arg);
|
||||
KSI_AsyncHandle_free(item->respHandle);
|
||||
free(item);
|
||||
pthread_mutex_unlock(&ctx->module_lock);
|
||||
}
|
||||
|
||||
ret = true;
|
||||
|
||||
cleanup:
|
||||
KSI_AsyncService_getPendingCount(as, &p);
|
||||
debug_report(ctx, "debug: leaving, pending: %zu, q. size %zu, ret: %d", p, ProtectedQueue_count(ctx->signer_queue), ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void *signer_thread(void *arg) {
|
||||
|
||||
rsksictx ctx = (rsksictx) arg;
|
||||
@ -1208,18 +1463,44 @@ void *signer_thread(void *arg) {
|
||||
FILE* ksiFile = NULL;
|
||||
time_t timeout;
|
||||
KSI_CTX *ksi_ctx;
|
||||
KSI_AsyncService *as = NULL;
|
||||
int res = 0;
|
||||
bool ret = false;
|
||||
|
||||
ctx->thread_started = true;
|
||||
debug_report(ctx, "debug: Starting signer thread");
|
||||
|
||||
KSI_CTX_new(&ksi_ctx);
|
||||
KSI_CTX_setAggregator(ksi_ctx, ctx->aggregatorUri, ctx->aggregatorId, ctx->aggregatorKey);
|
||||
CHECK_KSI_API(KSI_CTX_new(&ksi_ctx), ctx, "KSI_CTX_new");
|
||||
CHECK_KSI_API(KSI_CTX_setAggregator(ksi_ctx, ctx->aggregatorUri, ctx->aggregatorId, ctx->aggregatorKey), ctx, "KSI_CTX_setAggregator");
|
||||
CHECK_KSI_API(KSI_CTX_setOption(ksi_ctx, KSI_OPT_AGGR_HMAC_ALGORITHM, (void*)((size_t)ctx->hmacAlg)), ctx, "KSI_CTX_setOption");
|
||||
|
||||
res = KSI_SigningAsyncService_new(ksi_ctx, &as);
|
||||
if (res != KSI_OK) {
|
||||
reportKSIAPIErr(ctx, NULL, "KSI_SigningAsyncService_new", res);
|
||||
}
|
||||
else {
|
||||
res = KSI_AsyncService_setEndpoint(as, ctx->aggregatorUri, ctx->aggregatorId, ctx->aggregatorKey);
|
||||
if (res != KSI_OK) {
|
||||
//This can fail if the protocol is not supported by async api.
|
||||
//in this case the plugin will fall back to sync api
|
||||
KSI_AsyncService_free(as);
|
||||
as=NULL;
|
||||
} else {
|
||||
res = KSI_AsyncService_setOption(as, KSI_ASYNC_OPT_MAX_REQUEST_COUNT, (void*) (ctx->max_requests));
|
||||
if (res != KSI_OK)
|
||||
reportKSIAPIErr(ctx, NULL, "KSI_AsyncService_setOption(max_request)", res);
|
||||
|
||||
/* ingoring the possible error here */
|
||||
KSI_AsyncService_setOption(as, KSI_ASYNC_OPT_REQUEST_CACHE_SIZE, (void*) (ctx->max_requests * 5));
|
||||
}
|
||||
}
|
||||
|
||||
while (true) {
|
||||
timeout = 1;
|
||||
|
||||
/* Wait for a work item or timeout*/
|
||||
ProtectedQueue_waitForItem(ctx->signer_queue, NULL, timeout * 1000);
|
||||
|
||||
debug_report(ctx, "debug: ProtectedQueue_waitForItem");
|
||||
/* Check for block time limit*/
|
||||
sigblkCheckTimeOut(ctx);
|
||||
|
||||
@ -1227,32 +1508,54 @@ void *signer_thread(void *arg) {
|
||||
if (ProtectedQueue_count(ctx->signer_queue) == 0)
|
||||
continue;
|
||||
|
||||
/* drain all consecutive signature requests from the queue and add
|
||||
* them to aggregation request */
|
||||
while (ProtectedQueue_peekFront(ctx->signer_queue, (void**) &item)
|
||||
&& item->type == QITEM_SIGNATURE_REQUEST) {
|
||||
process_requests(ctx, ksi_ctx, ksiFile);
|
||||
continue;
|
||||
if(as != NULL) {
|
||||
/* in case of asynchronous service check for pending/unsent requests */
|
||||
ret = process_requests_async(ctx, ksi_ctx, as, ksiFile);
|
||||
if(!ret) {
|
||||
// probably fatal error, disable signing, error should be already reported
|
||||
ctx->disabled = true;
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* drain all consecutive signature requests from the queue and add
|
||||
* the last one to aggregation request */
|
||||
if (ProtectedQueue_peekFront(ctx->signer_queue, (void**) &item)
|
||||
&& item->type == QITEM_SIGNATURE_REQUEST) {
|
||||
process_requests(ctx, ksi_ctx, ksiFile);
|
||||
}
|
||||
}
|
||||
|
||||
/* if there are sig. requests still in the front, then we have to start over*/
|
||||
if (ProtectedQueue_peekFront(ctx->signer_queue, (void**) &item)
|
||||
&& item->type == QITEM_SIGNATURE_REQUEST)
|
||||
continue;
|
||||
|
||||
/* Handle other types of work items */
|
||||
if (ProtectedQueue_popFront(ctx->signer_queue, (void**) &item) != 0) {
|
||||
if (item->type == QITEM_CLOSE_FILE) {
|
||||
if (ksiFile)
|
||||
if (ksiFile) {
|
||||
fclose(ksiFile);
|
||||
debug_report(ctx, "debug: sig-thread closing file %p", ksiFile);
|
||||
}
|
||||
ksiFile = NULL;
|
||||
debug_report(ctx, "debug: sig-thread closing file %p", ksiFile);
|
||||
} else if (item->type == QITEM_NEW_FILE) {
|
||||
debug_report(ctx, "debug: sig-thread opening new file %p", item->arg);
|
||||
ksiFile = (FILE*) item->arg;
|
||||
} else if (item->type == QITEM_QUIT) {
|
||||
if (ksiFile)
|
||||
fclose(ksiFile);
|
||||
free(item);
|
||||
break;
|
||||
debug_report(ctx, "debug: sig-thread quitting");
|
||||
goto cleanup;
|
||||
}
|
||||
free(item);
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
KSI_AsyncService_free(as);
|
||||
KSI_CTX_free(ksi_ctx);
|
||||
ctx->thread_started = false;
|
||||
return NULL;
|
||||
|
||||
@ -35,6 +35,13 @@
|
||||
/* check return state of operation and abort, if non-OK */
|
||||
#define CHKr(code) if((r = code) != 0) goto done
|
||||
|
||||
/* check the return value of a ksi api call and log a message in case of error */
|
||||
#define CHECK_KSI_API(code, context, msg) if((res = code) != 0) do { \
|
||||
reportKSIAPIErr(context, NULL, msg, res); \
|
||||
goto cleanup; \
|
||||
} while (0)
|
||||
|
||||
|
||||
typedef enum LOGSIG_SyncMode_en {
|
||||
/** The block hashes and ksi signatures in one file */
|
||||
LOGSIG_ASYNCHRONOUS = 0x00,
|
||||
@ -61,6 +68,7 @@ struct rsksictx_s {
|
||||
KSI_CTX *ksi_ctx; /* libksi's context object */
|
||||
KSI_DataHasher *hasher;
|
||||
KSI_HashAlgorithm hashAlg;
|
||||
KSI_HashAlgorithm hmacAlg;
|
||||
uint8_t bKeepRecordHashes;
|
||||
uint8_t bKeepTreeHashes;
|
||||
uint64_t blockLevelLimit;
|
||||
@ -82,6 +90,8 @@ struct rsksictx_s {
|
||||
bool thread_started;
|
||||
uint8_t disabled; /* permits to disable the plugin --> set to 1 */
|
||||
ksifile ksi;
|
||||
bool debug;
|
||||
uint64_t max_requests;
|
||||
void (*errFunc)(void *, unsigned char*);
|
||||
void (*logFunc)(void *, unsigned char*);
|
||||
void *usrptr; /* for error function */
|
||||
@ -173,12 +183,12 @@ struct rsksistatefile {
|
||||
#define rsksiSetDirGID(ctx, val) ((ctx)->dirGID = val)
|
||||
#define rsksiSetCreateMode(ctx, val) ((ctx)->fCreateMode= val)
|
||||
#define rsksiSetDirCreateMode(ctx, val) ((ctx)->fDirCreateMode = val)
|
||||
|
||||
#define rsksiSetDebug(ctx, val) ((ctx)->debug = val)
|
||||
|
||||
int rsksiSetAggregator(rsksictx ctx, char *uri, char *loginid, char *key);
|
||||
int rsksiSetHashFunction(rsksictx ctx, char *algName);
|
||||
int rsksiInit(char *usragent);
|
||||
void rsksiExit(void);
|
||||
int rsksiSetHmacFunction(rsksictx ctx, char *algName);
|
||||
int rsksiInitModule(rsksictx ctx);
|
||||
rsksictx rsksiCtxNew(void);
|
||||
void rsksisetErrFunc(rsksictx ctx, void (*func)(void*, unsigned char *), void *usrptr);
|
||||
void rsksisetLogFunc(rsksictx ctx, void (*func)(void*, unsigned char *), void *usrptr);
|
||||
|
||||
@ -47,6 +47,7 @@ static struct cnfparamdescr cnfpdescr[] = {
|
||||
{ "sig.aggregator.url", eCmdHdlrGetWord, CNFPARAM_REQUIRED},
|
||||
{ "sig.aggregator.user", eCmdHdlrGetWord, CNFPARAM_REQUIRED},
|
||||
{ "sig.aggregator.key", eCmdHdlrGetWord, CNFPARAM_REQUIRED},
|
||||
{ "sig.aggregator.hmacAlg", eCmdHdlrGetWord, 0 },
|
||||
{ "sig.block.levelLimit", eCmdHdlrSize, CNFPARAM_REQUIRED},
|
||||
{ "sig.block.timeLimit", eCmdHdlrInt, 0},
|
||||
{ "sig.keeprecordhashes", eCmdHdlrBinary, 0 },
|
||||
@ -54,6 +55,7 @@ static struct cnfparamdescr cnfpdescr[] = {
|
||||
{ "sig.fileformat", eCmdHdlrString, 0},
|
||||
{ "sig.syncmode", eCmdHdlrString, 0},
|
||||
{ "sig.randomsource", eCmdHdlrString, 0},
|
||||
{ "sig.debug", eCmdHdlrInt, 0},
|
||||
{ "dirowner", eCmdHdlrUID, 0}, /* legacy: dirowner */
|
||||
{ "dirownernum", eCmdHdlrInt, 0 }, /* legacy: dirownernum */
|
||||
{ "dirgroup", eCmdHdlrGID, 0 }, /* legacy: dirgroup */
|
||||
@ -119,7 +121,8 @@ ENDobjDestruct(lmsig_ksi_ls12)
|
||||
static rsRetVal
|
||||
SetCnfParam(void *pT, struct nvlst *lst)
|
||||
{
|
||||
char *ag_uri = NULL, *ag_loginid = NULL, *ag_key = NULL, *hash=NULL;
|
||||
char *ag_uri = NULL, *ag_loginid = NULL, *ag_key = NULL;
|
||||
char *hash=NULL, *hmac = NULL;
|
||||
lmsig_ksi_ls12_t *pThis = (lmsig_ksi_ls12_t*) pT;
|
||||
int i;
|
||||
uchar *cstr;
|
||||
@ -147,6 +150,8 @@ SetCnfParam(void *pT, struct nvlst *lst)
|
||||
ag_loginid = es_str2cstr(pvals[i].val.d.estr, NULL);
|
||||
} else if (!strcmp(pblk.descr[i].name, "sig.aggregator.key")) {
|
||||
ag_key = es_str2cstr(pvals[i].val.d.estr, NULL);
|
||||
} else if(!strcmp(pblk.descr[i].name, "sig.aggregator.hmacAlg")) {
|
||||
hmac = (char*) es_str2cstr(pvals[i].val.d.estr, NULL);
|
||||
} else if (!strcmp(pblk.descr[i].name, "sig.block.levelLimit")) {
|
||||
if (pvals[i].val.d.n < 2) {
|
||||
errmsg.LogError(0, RS_RET_ERR, "sig.block.levelLimit "
|
||||
@ -177,6 +182,8 @@ SetCnfParam(void *pT, struct nvlst *lst)
|
||||
cstr = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
|
||||
rsksiSetRandomSource(pThis->ctx, (char*) cstr);
|
||||
free(cstr);
|
||||
} else if (!strcmp(pblk.descr[i].name, "sig.debug")) {
|
||||
rsksiSetDebug(pThis->ctx, pvals[i].val.d.n);
|
||||
} else if (!strcmp(pblk.descr[i].name, "dirowner")) {
|
||||
rsksiSetDirUID(pThis->ctx, pvals[i].val.d.n);
|
||||
} else if (!strcmp(pblk.descr[i].name, "dirownernum")) {
|
||||
@ -206,6 +213,9 @@ SetCnfParam(void *pT, struct nvlst *lst)
|
||||
if(rsksiSetHashFunction(pThis->ctx, hash ? hash : (char*) "SHA2-256") != KSI_OK)
|
||||
goto finalize_it;
|
||||
|
||||
if(rsksiSetHmacFunction(pThis->ctx, hmac ? hmac : (char*) "SHA2-256") != KSI_OK)
|
||||
goto finalize_it;
|
||||
|
||||
if(rsksiSetAggregator(pThis->ctx, ag_uri, ag_loginid, ag_key) != KSI_OK)
|
||||
goto finalize_it;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user