rsyslog/tests/miniamqpsrvr.c
2019-04-08 12:11:33 +02:00

659 lines
19 KiB
C

/* a very simplistic tcp receiver for the rsyslog testbench.
*
* Author Philippe Duveau
*
* This file is contribution of the rsyslog project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* -or-
* see COPYING.ASL20 in the source distribution
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <fcntl.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>
#if defined(__FreeBSD__)
#include <netinet/in.h>
#endif
#include "rsyslog.h"
#include <amqp.h>
#include <amqp_framing.h>
#define AMQP_STARTING ((uchar)0x10)
#define AMQP_STOP ((uchar)0x00)
#define AMQP_BEHAVIOR_STANDARD 1
#define AMQP_BEHAVIOR_NOEXCH 2
#define AMQP_BEHAVIOR_DECEXCH 3
#define AMQP_BEHAVIOR_BADEXCH 4
uchar connection_start[487] = {
0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xDF, 0x00, 0x0A, 0x00, 0x0A, 0x00, 0x09, 0x00, 0x00, 0x01,
0xBA, 0x0C, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6C, 0x69, 0x74, 0x69, 0x65, 0x73, 0x46, 0x00,
0x00, 0x00, 0xC7, 0x12, 0x70, 0x75, 0x62, 0x6C, 0x69, 0x73, 0x68, 0x65, 0x72, 0x5F, 0x63, 0x6F,
0x6E, 0x66, 0x69, 0x72, 0x6D, 0x73, 0x74, 0x01, 0x1A, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67,
0x65, 0x5F, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67, 0x65, 0x5F, 0x62, 0x69, 0x6E, 0x64, 0x69,
0x6E, 0x67, 0x73, 0x74, 0x01, 0x0A, 0x62, 0x61, 0x73, 0x69, 0x63, 0x2E, 0x6E, 0x61, 0x63, 0x6B,
0x74, 0x01, 0x16, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65, 0x72, 0x5F, 0x63, 0x61, 0x6E, 0x63,
0x65, 0x6C, 0x5F, 0x6E, 0x6F, 0x74, 0x69, 0x66, 0x79, 0x74, 0x01, 0x12, 0x63, 0x6F, 0x6E, 0x6E,
0x65, 0x63, 0x74, 0x69, 0x6F, 0x6E, 0x2E, 0x62, 0x6C, 0x6F, 0x63, 0x6B, 0x65, 0x64, 0x74, 0x01,
0x13, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65, 0x72, 0x5F, 0x70, 0x72, 0x69, 0x6F, 0x72, 0x69,
0x74, 0x69, 0x65, 0x73, 0x74, 0x01, 0x1C, 0x61, 0x75, 0x74, 0x68, 0x65, 0x6E, 0x74, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6F, 0x6E, 0x5F, 0x66, 0x61, 0x69, 0x6C, 0x75, 0x72, 0x65, 0x5F, 0x63, 0x6C,
0x6F, 0x73, 0x65, 0x74, 0x01, 0x10, 0x70, 0x65, 0x72, 0x5F, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D,
0x65, 0x72, 0x5F, 0x71, 0x6F, 0x73, 0x74, 0x01, 0x0F, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5F,
0x72, 0x65, 0x70, 0x6C, 0x79, 0x5F, 0x74, 0x6F, 0x74, 0x01, 0x0C, 0x63, 0x6C, 0x75, 0x73, 0x74,
0x65, 0x72, 0x5F, 0x6E, 0x61, 0x6D, 0x65, 0x53, 0x00, 0x00, 0x00, 0x0D, 0x72, 0x61, 0x62, 0x62,
0x69, 0x74, 0x40, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x09, 0x63, 0x6F, 0x70, 0x79, 0x72, 0x69,
0x67, 0x68, 0x74, 0x53, 0x00, 0x00, 0x00, 0x2E, 0x43, 0x6F, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68,
0x74, 0x20, 0x28, 0x43, 0x29, 0x20, 0x32, 0x30, 0x30, 0x37, 0x2D, 0x32, 0x30, 0x31, 0x36, 0x20,
0x50, 0x69, 0x76, 0x6F, 0x74, 0x61, 0x6C, 0x20, 0x53, 0x6F, 0x66, 0x74, 0x77, 0x61, 0x72, 0x65,
0x2C, 0x20, 0x49, 0x6E, 0x63, 0x2E, 0x0B, 0x69, 0x6E, 0x66, 0x6F, 0x72, 0x6D, 0x61, 0x74, 0x69,
0x6F, 0x6E, 0x53, 0x00, 0x00, 0x00, 0x35, 0x4C, 0x69, 0x63, 0x65, 0x6E, 0x73, 0x65, 0x64, 0x20,
0x75, 0x6E, 0x64, 0x65, 0x72, 0x20, 0x74, 0x68, 0x65, 0x20, 0x4D, 0x50, 0x4C, 0x2E, 0x20, 0x20,
0x53, 0x65, 0x65, 0x20, 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, 0x77, 0x2E, 0x72,
0x61, 0x62, 0x62, 0x69, 0x74, 0x6D, 0x71, 0x2E, 0x63, 0x6F, 0x6D, 0x2F, 0x08, 0x70, 0x6C, 0x61,
0x74, 0x66, 0x6F, 0x72, 0x6D, 0x53, 0x00, 0x00, 0x00, 0x0A, 0x45, 0x72, 0x6C, 0x61, 0x6E, 0x67,
0x2F, 0x4F, 0x54, 0x50, 0x07, 0x70, 0x72, 0x6F, 0x64, 0x75, 0x63, 0x74, 0x53, 0x00, 0x00, 0x00,
0x08, 0x52, 0x61, 0x62, 0x62, 0x69, 0x74, 0x4D, 0x51, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6F,
0x6E, 0x53, 0x00, 0x00, 0x00, 0x05, 0x33, 0x2E, 0x36, 0x2E, 0x32, 0x00, 0x00, 0x00, 0x0E, 0x41,
0x4D, 0x51, 0x50, 0x4C, 0x41, 0x49, 0x4E, 0x20, 0x50, 0x4C, 0x41, 0x49, 0x4E, 0x00, 0x00, 0x00,
0x05, 0x65, 0x6E, 0x5F, 0x55, 0x53, 0xCE
};
static uchar connection_tune[20] = {
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0C, 0x00, 0x0A, 0x00, 0x1E, 0x00, 0x00, 0x00, 0x02, 0x00,
0x00, 0x00, 0x3C, 0xCE
};
static uchar connection_open_ok[13] = {
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x0A, 0x00, 0x29, 0x00, 0xCE
};
static uchar channel_open_ok[16] = {
0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x14, 0x00, 0x0B, 0x00, 0x00, 0x00, 0x00, 0xCE
};
static uchar exchange_declare_ok[12] = {
0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x28, 0x00, 0x0B, 0xCE
};
static uchar channel_close_ok[12] = {
0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x14, 0x00, 0x29, 0xCE
};
static uchar connection_close_ok[12] = {
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x0A, 0x00, 0x33, 0xCE
};
static uchar channel_close_ok_on_badexch[148] = {
0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x8C, 0x00, 0x14, 0x00, 0x28, 0x01,
0x96, 0x81, 0x50, 0x52, 0x45, 0x43, 0x4F, 0x4E, 0x44, 0x49, 0x54, 0x49,
0x4F, 0x4E, 0x5F, 0x46, 0x41, 0x49, 0x4C, 0x45, 0x44, 0x20, 0x2D, 0x20,
0x69, 0x6E, 0x65, 0x71, 0x75, 0x69, 0x76, 0x61, 0x6C, 0x65, 0x6E, 0x74,
0x20, 0x61, 0x72, 0x67, 0x20, 0x27, 0x64, 0x75, 0x72, 0x61, 0x62, 0x6C,
0x65, 0x27, 0x20, 0x66, 0x6F, 0x72, 0x20, 0x65, 0x78, 0x63, 0x68, 0x61,
0x6E, 0x67, 0x65, 0x20, 0x27, 0x69, 0x6E, 0x27, 0x20, 0x69, 0x6E, 0x20,
0x76, 0x68, 0x6F, 0x73, 0x74, 0x20, 0x27, 0x2F, 0x6D, 0x65, 0x74, 0x72,
0x6F, 0x6C, 0x6F, 0x67, 0x69, 0x65, 0x27, 0x3A, 0x20, 0x72, 0x65, 0x63,
0x65, 0x69, 0x76, 0x65, 0x64, 0x20, 0x27, 0x66, 0x61, 0x6C, 0x73, 0x65,
0x27, 0x20, 0x62, 0x75, 0x74, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6E,
0x74, 0x20, 0x69, 0x73, 0x20, 0x27, 0x74, 0x72, 0x75, 0x65, 0x27, 0x00,
0x28, 0x00, 0x0A, 0xCE
};
typedef struct {
uchar type;
ushort ch;
uint32_t method;
uint16_t header_flags;
size_t datalen;
size_t framelen;
uchar *data;
} amqp_frame_type_t;
#define DBGPRINTF0(f, ...) if (debug>0) { \
struct timeval dbgtv; \
gettimeofday(&dbgtv, NULL);\
fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), \
(int)(dbgtv.tv_usec/1000), __VA_ARGS__); \
}
#define DBGPRINTF1(f, ...) if (debug>0) { \
struct timeval dbgtv; \
gettimeofday(&dbgtv, NULL);\
dbgtv.tv_sec -= dbgtv_base.tv_sec; \
dbgtv.tv_usec -= dbgtv_base.tv_usec; \
if (dbgtv.tv_usec < 0) { \
dbgtv.tv_usec += 1000000; \
dbgtv.tv_sec--; \
} \
fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), \
(int)(dbgtv.tv_usec/1000), __VA_ARGS__); \
}
#define DBGPRINTF2(f, ...) if (debug==2) { \
struct timeval dbgtv; \
gettimeofday(&dbgtv, NULL);\
dbgtv.tv_sec -= dbgtv_base.tv_sec; \
dbgtv.tv_usec -= dbgtv_base.tv_usec; \
if (dbgtv.tv_usec < 0) { \
dbgtv.tv_usec += 1000000; \
dbgtv.tv_sec--; \
} \
fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), \
(int)(dbgtv.tv_usec/1000), __VA_ARGS__); \
}
static struct timeval dbgtv_base;
static int server_behaviors = 0;
static int behaviors;
static int wait_after_accept = 200; /* milliseconds */
static char *outfile = NULL;
static int debug = 1;
FILE* fpout = NULL;
static ATTR_NORETURN void
errout(const char *reason, int server)
{
char txt[256];
snprintf(txt,256,"%s server %d", reason, server);
perror(txt);
if (fpout && fpout != stdout) { fclose(fpout); fpout = NULL; }
if (outfile) unlink(outfile);
exit(1);
}
static ATTR_NORETURN void
usage(void)
{
fprintf(stderr, "usage: minirmqsrvr -f outfile [-b behaviour] "
"[-t keep_alive_max] [-w delay_after_fail] [-d]\n");
exit (1);
}
/* Those three functions are "endianess" insensitive */
static uint16_t buf2uint16(uchar*b) {
return ((uint16_t)b[0]) << 8 | ((uint16_t)b[1]);
}
static uint32_t buf2uint32(uchar*b) {
return ((uint32_t)b[0]) << 24 | ((uint32_t)b[1]) << 16 | ((uint32_t)b[2]) << 8 | ((uint32_t)b[3]);
}
static uint64_t buf2uint64(uchar*b) {
return ((uint64_t)b[0]) << 56 | ((uint64_t)b[1]) << 48 | ((uint64_t)b[2]) << 40 | ((uint64_t)b[3]) << 32
| ((uint64_t)b[4]) << 24 | ((uint64_t)b[5]) << 16 | ((uint64_t)b[6]) << 8 | ((uint64_t)b[7]);
}
static char AMQP091[8] = { 'A', 'M', 'Q', 'P', 0x00, 0x00, 0x09, 0x01 };
static int
decode_frame_type(uchar *buf, amqp_frame_type_t *frame, size_t nread) {
if (nread == 8){
if (memcmp(buf, AMQP091, sizeof(AMQP091)))
return -1;
frame->framelen = 8;
frame->type = AMQP_STARTING;
frame->ch = 0;
return 0;
}
frame->type = buf[0];
frame->ch = buf2uint16(buf+1);
frame->datalen = buf2uint32(buf+3);
frame->framelen = frame->datalen + 8;
frame->method = buf2uint32(buf+7);
switch (frame->type) {
case AMQP_FRAME_BODY:
frame->data = buf + 7;
break;
default:
frame->data = buf + 11;
}
return 0;
}
static ssize_t
amqp_write(int fdc, uchar *buf, size_t blen, unsigned short channel) {
buf[1] = (char) (channel >> 8);
buf[2] = (char) (channel & 0xFF);
return write(fdc, buf, blen);
}
static uchar *
amqpFieldUint64(uint64_t *d, uchar *s) {
*d = buf2uint64(s);
return s + 8;
}
static uchar *
amqpFieldUint32(uint32_t *d, uchar *s) {
*d = buf2uint32(s);
return s + 4;
}
static uchar *
amqpFieldUint16(uint16_t *d, uchar *s) {
*d = buf2uint16(s);
return s + 2;
}
static uchar *
amqpFieldLenFprintf(const char *pfx, uchar *s, uint32_t len) {
if (fpout)
fprintf(fpout, "%s%.*s", pfx, (int)len, (char*)s);
return s + len;
}
static uchar *
amqpFieldFprintf(const char *pfx, uchar *s) {
uint32_t len = *s++;
return amqpFieldLenFprintf(pfx, s, len);
}
static uchar *
amqpHeaderFprintf(uchar *s, uint32_t *size) {
uint32_t len;
uchar *p = amqpFieldFprintf(", ", s);
p++; /* value type */
p = amqpFieldUint32(&len, p);
*size -= (p - s) + len;
return amqpFieldLenFprintf(":", p, len);
}
static void
amqp_srvr(int port, int srvr, int fds, int piperead, int pipewrite)
{
uchar wrkBuf[8192], *p;
size_t nRead = 0, bsize = 0;
ssize_t nSent;
amqp_frame_type_t frame;
uint64_t body_ui64 = 0;
uint32_t props_header_size;
uint16_t props_flags;
int my_behaviour;
struct timeval tv;
fd_set rfds;
int nfds = ((piperead > fds)? piperead : fds) + 1;
int fdc;
my_behaviour = behaviors & 0x000F;
behaviors = behaviors >> 4; /* for next server */;
if(listen(fds, 0) != 0) errout("listen", port);
DBGPRINTF1("Server AMQP %d on port %d started\n", srvr, port);
tv.tv_sec = 120;
tv.tv_usec = 0;
FD_ZERO(&rfds);
FD_SET(fds, &rfds);
if (piperead > 0)
FD_SET(piperead, &rfds);
if (select(nfds,&rfds,NULL,NULL, &tv) == 0) {
exit(1);
}
if (piperead > 0 && FD_ISSET(piperead, &rfds)) {
char c;
int l = read(piperead, &c, 1);
if (l == 1) {
my_behaviour = behaviors & 0x000F;
if (my_behaviour != 0) {
DBGPRINTF1("Server AMQP %d on port %d switch behaviour", srvr, port);
} else {
DBGPRINTF1("Server AMQP %d on port %d leaving", srvr, port);
if (fpout && fpout != stdout) { fclose(fpout); fpout = NULL; }
exit(1);
}
}
}
fdc = accept(fds, NULL, NULL);
if (pipewrite > 0)
nSent = write(pipewrite, "N", 1);
close(fds);
fds = -1;
/* this let the os understand that the port is closed */
usleep(1000 * wait_after_accept);
frame.type = AMQP_STARTING;
while(fdc > 0) {
nSent = 0;
ssize_t rd = 0;
if (nRead < 12) {
rd = read(fdc, wrkBuf + nRead, sizeof(wrkBuf) - nRead);
if (rd <= 0) {
DBGPRINTF1("Server AMQP %d on port %d disconnected\n", srvr, port);
close(fdc);
fdc = 0;
break;
}else {
nRead += (size_t)rd;
}
}
if (decode_frame_type(wrkBuf, &frame, nRead)) {
DBGPRINTF1("Server AMQP %d on port %d killed : bad protocol\n", srvr, port);
close(fdc);
fdc = 0;
break;
}
if (rd > 4)
DBGPRINTF2("Server received : %zd\n", rd);
switch (frame.type) {
case AMQP_STARTING: /* starting handshake */
DBGPRINTF1("Server AMQP %d on port %d type %d connected\n", srvr, port, my_behaviour);
DBGPRINTF2("Server %d connection.start\n", srvr);
nSent = amqp_write(fdc, connection_start, sizeof(connection_start), frame.ch);
break;
case AMQP_FRAME_METHOD:
DBGPRINTF2("Server %d method : 0x%X\n", srvr, frame.method);
switch (frame.method) {
case AMQP_CONNECTION_START_OK_METHOD:
DBGPRINTF2("Server %d connection.tune\n", srvr);
nSent = amqp_write(fdc, connection_tune, sizeof(connection_tune), frame.ch);
break;
case AMQP_CONNECTION_TUNE_OK_METHOD:
DBGPRINTF2("Client %d connection.tune-ok\n", srvr);
nSent = 0;
break;
case AMQP_CONNECTION_OPEN_METHOD:
nSent = amqp_write(fdc, connection_open_ok,
sizeof(connection_open_ok), frame.ch);
DBGPRINTF2("Server %d connection.open\n", srvr);
break;
case AMQP_CHANNEL_OPEN_METHOD:
nSent = amqp_write(fdc, channel_open_ok,
sizeof(channel_open_ok), frame.ch);
DBGPRINTF2("Server %d channel.open\n", srvr);
if (my_behaviour == AMQP_BEHAVIOR_NOEXCH) {
close(fdc);
DBGPRINTF1("Server AMQP %d on port %d stopped\n", srvr, port);
fdc = 0;
frame.type = 0;
}
break;
case AMQP_EXCHANGE_DECLARE_METHOD:
if (my_behaviour == AMQP_BEHAVIOR_BADEXCH) {
nSent = amqp_write(fdc, channel_close_ok_on_badexch,
sizeof(channel_close_ok_on_badexch), frame.ch);
}else{
nSent = amqp_write(fdc, exchange_declare_ok,
sizeof(exchange_declare_ok), frame.ch);
}
DBGPRINTF2("Server %d exchange.declare\n", srvr);
if (my_behaviour == AMQP_BEHAVIOR_DECEXCH) {
close(fdc);
DBGPRINTF1("Server AMQP %d on port %d stopped\n", srvr, port);
fdc = 0;
frame.type = 0;
}
break;
case AMQP_CHANNEL_CLOSE_METHOD:
nSent = amqp_write(fdc, channel_close_ok,
sizeof(channel_close_ok), frame.ch);
DBGPRINTF2("Server %d channel.close\n", srvr);
break;
case AMQP_CONNECTION_CLOSE_METHOD:
nSent = amqp_write(fdc, connection_close_ok,
sizeof(connection_close_ok), frame.ch);
DBGPRINTF2("Server %d connection.close\n", srvr);
break;
case AMQP_BASIC_PUBLISH_METHOD:
p = amqpFieldFprintf("Exchange:", frame.data + 2);
amqpFieldFprintf(", routing-key:", p);
break;
default:
nSent = 0;
}
break;
case AMQP_FRAME_HEADER:
DBGPRINTF2("Server %d HEADERS\n", srvr);
p = amqpFieldUint64(&body_ui64, frame.data);
bsize = (size_t)body_ui64;
p = amqpFieldUint16(&props_flags, p);
if (props_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
p = amqpFieldFprintf(", content-type:", p);
}
if (props_flags & AMQP_BASIC_HEADERS_FLAG) {
p = amqpFieldUint32(&props_header_size, p);
while (props_header_size) {
p = amqpHeaderFprintf(p, &props_header_size);
}
}
if (props_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
if (fpout)
fprintf(fpout, ", delivery-mode:%s", (*p++)?"transient":"persistent");
}
if (props_flags & AMQP_BASIC_EXPIRATION_FLAG) {
p = amqpFieldFprintf(", expiration:", p);
}
if (props_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
if (fpout)
fprintf(fpout, ", timestamp:OK");
p += sizeof(uint64_t);
}
if (props_flags & AMQP_BASIC_APP_ID_FLAG) {
amqpFieldFprintf(", app-id:", p);
}
if (fpout)
fprintf(fpout, ", msg:");
break;
case AMQP_FRAME_BODY:
DBGPRINTF2("Server %d Body size left : %zu, received : %zu\n",
srvr, bsize, frame.datalen);
bsize -= frame.datalen;
if (fpout) {
fprintf(fpout, "%.*s", (int)frame.datalen, frame.data);
if (frame.data[frame.datalen-1] != '\n')
fprintf(fpout, "\n");
fflush(fpout);
}
break;
default:
DBGPRINTF1("Server %d unsupported frame type %d\n", srvr, frame.type);
close(fdc);
fdc = 0;
frame.type = 0;
frame.framelen = 0;
} /* switch (frame.type) */
nRead -= frame.framelen;
if (nRead>0)
memmove(wrkBuf, wrkBuf + frame.framelen, nRead);
if (nSent < 0) {
close(fdc);
fdc = 0;
}
} /* while(fdc) */
DBGPRINTF2("Leaving thread %d\n", srvr);
}
int
main(int argc, char *argv[])
{
int port[2], fds[2], i, opt, nb_port = 1;
int pipeS1toS2[2] = { -1, -1 };
int pipeS2toS1[2] = { -1, -1 };
int pipeRead[2], pipeWrite[2];
struct sockaddr_in srvAddr[2];
unsigned int addrLen = sizeof(struct sockaddr_in), len;
pid_t pid[2];
fpout = stdout;
while((opt = getopt(argc, argv, "f:b:w:d")) != -1) {
switch (opt) {
case 'w':
wait_after_accept = atoi(optarg);
break;
case 'd':
debug = 2;
break;
case 'b':
server_behaviors = atoi(optarg);
break;
case 'f':
if(strcmp(optarg, "-")) {
outfile = optarg;
fpout = fopen(optarg, "w");
if(fpout == NULL){
fprintf(stderr, "file %s could not be created\n", outfile);
exit(1);
}
}
break;
default:
fprintf(stderr, "invalid option '%c' or value missing - terminating...\n", opt);
usage();
break;
}
}
switch (server_behaviors) {
case 0:
behaviors = AMQP_BEHAVIOR_STANDARD;
nb_port = 1;
break;
case 1: /* two standard servers get message successfully */
behaviors = AMQP_BEHAVIOR_STANDARD;
nb_port = 2;
break;
case 2: /* 2 servers first server which disconnect after after open channel : no declare exchange */
behaviors = AMQP_BEHAVIOR_NOEXCH | AMQP_BEHAVIOR_STANDARD << 4;
nb_port = 2;
break;
case 3: /* 2 servers first server which disconnect after declare exchange*/
behaviors = AMQP_BEHAVIOR_DECEXCH | AMQP_BEHAVIOR_STANDARD << 4;
nb_port = 2;
break;
case 4: /* one server with bad exchange declare */
behaviors = AMQP_BEHAVIOR_BADEXCH;
nb_port = 1;
break;
default:
fprintf(stderr,"Invalid behavior");
exit(1);
}
gettimeofday(&dbgtv_base, NULL);
port[0] = port[1] = -1;
if (nb_port == 2) {
if(pipe(pipeS1toS2) == -1 || pipe(pipeS2toS1) == -1) {
fprintf(stderr, "Pipe failed !");
exit(1);
}
}
pipeRead[0] = pipeS2toS1[0];
pipeWrite[0] = pipeS1toS2[1];
pipeRead[1] = pipeS1toS2[0];
pipeWrite[1] = pipeS2toS1[1];
for (i = 0; i < nb_port; i++) {
fds[i] = socket(AF_INET, SOCK_STREAM, 0);
srvAddr[i].sin_family = AF_INET;
srvAddr[i].sin_addr.s_addr = INADDR_ANY;
srvAddr[i].sin_port = 0;
if(bind(fds[i], (struct sockaddr *)&srvAddr[i], addrLen) != 0)
errout("bind", 1);
len = addrLen;
if (getsockname(fds[i], (struct sockaddr *)&srvAddr[i], &len) == -1)
errout("bind", i+1);
if ((port[i] = ntohs(srvAddr[i].sin_port)) <= 0)
errout("get port", i+1);
}
for (i = 0; i < nb_port; i++) {
if ((pid[i] = fork()) == -1) {
fprintf(stderr, "Fork failed !");
exit(1);
}
if (pid[i] == 0) {
/* this is the child */
if (fds[1-i] > 0) close(fds[1-i]);
amqp_srvr(port[i], i+1, fds[i], pipeRead[i], pipeWrite[i]);
if (fpout && fpout != stdout) fclose(fpout);
DBGPRINTF2("%s\n","Leaving server");
return 0;
}
}
if (nb_port==2)
printf("export AMQPSRVRPID1=%ld AMQPSRVRPID2=%ld PORT_AMQP1=%d PORT_AMQP2=%d",
(long)pid[0], (long)pid[1], port[0], port[1]);
else
printf("export AMQPSRVRPID1=%ld PORT_AMQP1=%d",
(long)pid[0], port[0]);
return 0;
}