rsyslog/tests/miniamqpsrvr.c
Rainer Gerhards b326c76f45 style: normalize C source formatting via clang-format (PoC)
This commit applies the new canonical formatting style using `clang-format` with custom settings (notably 4-space indentation), as part of our shift toward automated formatting normalization.

⚠️ No functional changes are included — only whitespace and layout modifications as produced by `clang-format`.

This change is part of the formatting modernization strategy discussed in:
https://github.com/rsyslog/rsyslog/issues/5747

Key context:
- Formatting is now treated as a disposable view, normalized via tooling.
- The `.clang-format` file defines the canonical style.
- A fixup script (`devtools/format-code.sh`) handles remaining edge cases.
- Formatting commits are added to `.git-blame-ignore-revs` to reduce noise.
- Developers remain free to format code however they prefer locally.
2025-07-16 13:56:21 +02:00

606 lines
24 KiB
C

/* a very simplistic tcp receiver for the rsyslog testbench.
*
* Author Philippe Duveau
*
* This file is contribution of the rsyslog project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* -or-
* see COPYING.ASL20 in the source distribution
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <fcntl.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <inttypes.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>
#if defined(__FreeBSD__)
#include <netinet/in.h>
#endif
#include "rsyslog.h"
#include <amqp.h>
#include <amqp_framing.h>
#define AMQP_STARTING ((uchar)0x10)
#define AMQP_STOP ((uchar)0x00)
#define AMQP_BEHAVIOR_STANDARD 1
#define AMQP_BEHAVIOR_NOEXCH 2
#define AMQP_BEHAVIOR_DECEXCH 3
#define AMQP_BEHAVIOR_BADEXCH 4
uchar connection_start[487] = {
0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xDF, 0x00, 0x0A, 0x00, 0x0A, 0x00, 0x09, 0x00, 0x00, 0x01, 0xBA, 0x0C, 0x63,
0x61, 0x70, 0x61, 0x62, 0x69, 0x6C, 0x69, 0x74, 0x69, 0x65, 0x73, 0x46, 0x00, 0x00, 0x00, 0xC7, 0x12, 0x70, 0x75,
0x62, 0x6C, 0x69, 0x73, 0x68, 0x65, 0x72, 0x5F, 0x63, 0x6F, 0x6E, 0x66, 0x69, 0x72, 0x6D, 0x73, 0x74, 0x01, 0x1A,
0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67, 0x65, 0x5F, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67, 0x65, 0x5F, 0x62,
0x69, 0x6E, 0x64, 0x69, 0x6E, 0x67, 0x73, 0x74, 0x01, 0x0A, 0x62, 0x61, 0x73, 0x69, 0x63, 0x2E, 0x6E, 0x61, 0x63,
0x6B, 0x74, 0x01, 0x16, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65, 0x72, 0x5F, 0x63, 0x61, 0x6E, 0x63, 0x65, 0x6C,
0x5F, 0x6E, 0x6F, 0x74, 0x69, 0x66, 0x79, 0x74, 0x01, 0x12, 0x63, 0x6F, 0x6E, 0x6E, 0x65, 0x63, 0x74, 0x69, 0x6F,
0x6E, 0x2E, 0x62, 0x6C, 0x6F, 0x63, 0x6B, 0x65, 0x64, 0x74, 0x01, 0x13, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65,
0x72, 0x5F, 0x70, 0x72, 0x69, 0x6F, 0x72, 0x69, 0x74, 0x69, 0x65, 0x73, 0x74, 0x01, 0x1C, 0x61, 0x75, 0x74, 0x68,
0x65, 0x6E, 0x74, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x5F, 0x66, 0x61, 0x69, 0x6C, 0x75, 0x72, 0x65, 0x5F,
0x63, 0x6C, 0x6F, 0x73, 0x65, 0x74, 0x01, 0x10, 0x70, 0x65, 0x72, 0x5F, 0x63, 0x6F, 0x6E, 0x73, 0x75, 0x6D, 0x65,
0x72, 0x5F, 0x71, 0x6F, 0x73, 0x74, 0x01, 0x0F, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5F, 0x72, 0x65, 0x70, 0x6C,
0x79, 0x5F, 0x74, 0x6F, 0x74, 0x01, 0x0C, 0x63, 0x6C, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5F, 0x6E, 0x61, 0x6D, 0x65,
0x53, 0x00, 0x00, 0x00, 0x0D, 0x72, 0x61, 0x62, 0x62, 0x69, 0x74, 0x40, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x09,
0x63, 0x6F, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x53, 0x00, 0x00, 0x00, 0x2E, 0x43, 0x6F, 0x70, 0x79, 0x72,
0x69, 0x67, 0x68, 0x74, 0x20, 0x28, 0x43, 0x29, 0x20, 0x32, 0x30, 0x30, 0x37, 0x2D, 0x32, 0x30, 0x31, 0x36, 0x20,
0x50, 0x69, 0x76, 0x6F, 0x74, 0x61, 0x6C, 0x20, 0x53, 0x6F, 0x66, 0x74, 0x77, 0x61, 0x72, 0x65, 0x2C, 0x20, 0x49,
0x6E, 0x63, 0x2E, 0x0B, 0x69, 0x6E, 0x66, 0x6F, 0x72, 0x6D, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x53, 0x00, 0x00, 0x00,
0x35, 0x4C, 0x69, 0x63, 0x65, 0x6E, 0x73, 0x65, 0x64, 0x20, 0x75, 0x6E, 0x64, 0x65, 0x72, 0x20, 0x74, 0x68, 0x65,
0x20, 0x4D, 0x50, 0x4C, 0x2E, 0x20, 0x20, 0x53, 0x65, 0x65, 0x20, 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77,
0x77, 0x77, 0x2E, 0x72, 0x61, 0x62, 0x62, 0x69, 0x74, 0x6D, 0x71, 0x2E, 0x63, 0x6F, 0x6D, 0x2F, 0x08, 0x70, 0x6C,
0x61, 0x74, 0x66, 0x6F, 0x72, 0x6D, 0x53, 0x00, 0x00, 0x00, 0x0A, 0x45, 0x72, 0x6C, 0x61, 0x6E, 0x67, 0x2F, 0x4F,
0x54, 0x50, 0x07, 0x70, 0x72, 0x6F, 0x64, 0x75, 0x63, 0x74, 0x53, 0x00, 0x00, 0x00, 0x08, 0x52, 0x61, 0x62, 0x62,
0x69, 0x74, 0x4D, 0x51, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6F, 0x6E, 0x53, 0x00, 0x00, 0x00, 0x05, 0x33, 0x2E,
0x36, 0x2E, 0x32, 0x00, 0x00, 0x00, 0x0E, 0x41, 0x4D, 0x51, 0x50, 0x4C, 0x41, 0x49, 0x4E, 0x20, 0x50, 0x4C, 0x41,
0x49, 0x4E, 0x00, 0x00, 0x00, 0x05, 0x65, 0x6E, 0x5F, 0x55, 0x53, 0xCE};
static uchar connection_tune[20] = {0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0C, 0x00, 0x0A, 0x00,
0x1E, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x3C, 0xCE};
static uchar connection_open_ok[13] = {0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x0A, 0x00, 0x29, 0x00, 0xCE};
static uchar channel_open_ok[16] = {0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00,
0x14, 0x00, 0x0B, 0x00, 0x00, 0x00, 0x00, 0xCE};
static uchar exchange_declare_ok[12] = {0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x28, 0x00, 0x0B, 0xCE};
static uchar channel_close_ok[12] = {0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x14, 0x00, 0x29, 0xCE};
static uchar connection_close_ok[12] = {0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x0A, 0x00, 0x33, 0xCE};
static uchar channel_close_ok_on_badexch[148] = {
0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x8C, 0x00, 0x14, 0x00, 0x28, 0x01, 0x96, 0x81, 0x50, 0x52, 0x45, 0x43, 0x4F,
0x4E, 0x44, 0x49, 0x54, 0x49, 0x4F, 0x4E, 0x5F, 0x46, 0x41, 0x49, 0x4C, 0x45, 0x44, 0x20, 0x2D, 0x20, 0x69, 0x6E,
0x65, 0x71, 0x75, 0x69, 0x76, 0x61, 0x6C, 0x65, 0x6E, 0x74, 0x20, 0x61, 0x72, 0x67, 0x20, 0x27, 0x64, 0x75, 0x72,
0x61, 0x62, 0x6C, 0x65, 0x27, 0x20, 0x66, 0x6F, 0x72, 0x20, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6E, 0x67, 0x65, 0x20,
0x27, 0x69, 0x6E, 0x27, 0x20, 0x69, 0x6E, 0x20, 0x76, 0x68, 0x6F, 0x73, 0x74, 0x20, 0x27, 0x2F, 0x6D, 0x65, 0x74,
0x72, 0x6F, 0x6C, 0x6F, 0x67, 0x69, 0x65, 0x27, 0x3A, 0x20, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x20,
0x27, 0x66, 0x61, 0x6C, 0x73, 0x65, 0x27, 0x20, 0x62, 0x75, 0x74, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6E, 0x74,
0x20, 0x69, 0x73, 0x20, 0x27, 0x74, 0x72, 0x75, 0x65, 0x27, 0x00, 0x28, 0x00, 0x0A, 0xCE};
typedef struct {
uchar type;
ushort ch;
uint32_t method;
uint16_t header_flags;
size_t datalen;
size_t framelen;
uchar *data;
} amqp_frame_type_t;
#define DBGPRINTF0(f, ...) \
if (debug > 0) { \
struct timeval dbgtv; \
gettimeofday(&dbgtv, NULL); \
fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), (int)(dbgtv.tv_usec / 1000), __VA_ARGS__); \
}
#define DBGPRINTF1(f, ...) \
if (debug > 0) { \
struct timeval dbgtv; \
gettimeofday(&dbgtv, NULL); \
dbgtv.tv_sec -= dbgtv_base.tv_sec; \
dbgtv.tv_usec -= dbgtv_base.tv_usec; \
if (dbgtv.tv_usec < 0) { \
dbgtv.tv_usec += 1000000; \
dbgtv.tv_sec--; \
} \
fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), (int)(dbgtv.tv_usec / 1000), __VA_ARGS__); \
}
#define DBGPRINTF2(f, ...) \
if (debug == 2) { \
struct timeval dbgtv; \
gettimeofday(&dbgtv, NULL); \
dbgtv.tv_sec -= dbgtv_base.tv_sec; \
dbgtv.tv_usec -= dbgtv_base.tv_usec; \
if (dbgtv.tv_usec < 0) { \
dbgtv.tv_usec += 1000000; \
dbgtv.tv_sec--; \
} \
fprintf(stderr, "%02d.%03d " f, (int)(dbgtv.tv_sec % 60), (int)(dbgtv.tv_usec / 1000), __VA_ARGS__); \
}
static struct timeval dbgtv_base;
static int server_behaviors = 0;
static int behaviors;
static int wait_after_accept = 200; /* milliseconds */
static char *outfile = NULL;
static int debug = 1;
FILE *fpout = NULL;
static ATTR_NORETURN void errout(const char *reason, int server) {
char txt[256];
snprintf(txt, 256, "%s server %d", reason, server);
perror(txt);
if (fpout && fpout != stdout) {
fclose(fpout);
fpout = NULL;
}
if (outfile) unlink(outfile);
exit(1);
}
static ATTR_NORETURN void usage(void) {
fprintf(stderr,
"usage: minirmqsrvr -f outfile [-b behaviour] "
"[-t keep_alive_max] [-w delay_after_fail] [-d]\n");
exit(1);
}
/* Those three functions are "endianess" insensitive */
static uint16_t buf2uint16(uchar *b) {
return ((uint16_t)b[0]) << 8 | ((uint16_t)b[1]);
}
static uint32_t buf2uint32(uchar *b) {
return ((uint32_t)b[0]) << 24 | ((uint32_t)b[1]) << 16 | ((uint32_t)b[2]) << 8 | ((uint32_t)b[3]);
}
static uint64_t buf2uint64(uchar *b) {
return ((uint64_t)b[0]) << 56 | ((uint64_t)b[1]) << 48 | ((uint64_t)b[2]) << 40 | ((uint64_t)b[3]) << 32 |
((uint64_t)b[4]) << 24 | ((uint64_t)b[5]) << 16 | ((uint64_t)b[6]) << 8 | ((uint64_t)b[7]);
}
static char AMQP091[8] = {'A', 'M', 'Q', 'P', 0x00, 0x00, 0x09, 0x01};
static int decode_frame_type(uchar *buf, amqp_frame_type_t *frame, size_t nread) {
if (nread == 8) {
if (memcmp(buf, AMQP091, sizeof(AMQP091))) return -1;
frame->framelen = 8;
frame->type = AMQP_STARTING;
frame->ch = 0;
return 0;
}
frame->type = buf[0];
frame->ch = buf2uint16(buf + 1);
frame->datalen = buf2uint32(buf + 3);
frame->framelen = frame->datalen + 8;
frame->method = buf2uint32(buf + 7);
switch (frame->type) {
case AMQP_FRAME_BODY:
frame->data = buf + 7;
break;
default:
frame->data = buf + 11;
}
return 0;
}
static ssize_t amqp_write(int fdc, uchar *buf, size_t blen, unsigned short channel) {
buf[1] = (char)(channel >> 8);
buf[2] = (char)(channel & 0xFF);
return write(fdc, buf, blen);
}
static uchar *amqpFieldUint64(uint64_t *d, uchar *s) {
*d = buf2uint64(s);
return s + 8;
}
static uchar *amqpFieldUint32(uint32_t *d, uchar *s) {
*d = buf2uint32(s);
return s + 4;
}
static uchar *amqpFieldUint16(uint16_t *d, uchar *s) {
*d = buf2uint16(s);
return s + 2;
}
static uchar *amqpFieldLenFprintf(const char *pfx, uchar *s, uint32_t len) {
if (fpout) fprintf(fpout, "%s%.*s", pfx, (int)len, (char *)s);
return s + len;
}
static uchar *amqpFieldFprintf(const char *pfx, uchar *s) {
uint32_t len = *s++;
return amqpFieldLenFprintf(pfx, s, len);
}
static uchar *amqpHeaderFprintf(uchar *s, uint32_t *size) {
uint32_t len;
uchar *p = amqpFieldFprintf(", ", s);
p++; /* value type */
p = amqpFieldUint32(&len, p);
*size -= (p - s) + len;
return amqpFieldLenFprintf(":", p, len);
}
static void amqp_srvr(int port, int srvr, int fds, int piperead, int pipewrite) {
uchar wrkBuf[8192], *p;
size_t nRead = 0, bsize = 0;
ssize_t nSent;
amqp_frame_type_t frame;
uint64_t body_ui64 = 0;
uint32_t props_header_size;
uint16_t props_flags;
int my_behaviour;
struct timeval tv;
fd_set rfds;
int nfds = ((piperead > fds) ? piperead : fds) + 1;
int fdc;
my_behaviour = behaviors & 0x000F;
behaviors = behaviors >> 4; /* for next server */
;
if (listen(fds, 0) != 0) errout("listen", port);
DBGPRINTF1("Server AMQP %d on port %d started\n", srvr, port);
tv.tv_sec = 120;
tv.tv_usec = 0;
FD_ZERO(&rfds);
FD_SET(fds, &rfds);
if (piperead > 0) FD_SET(piperead, &rfds);
if (select(nfds, &rfds, NULL, NULL, &tv) == 0) {
exit(1);
}
if (piperead > 0 && FD_ISSET(piperead, &rfds)) {
char c;
int l = read(piperead, &c, 1);
if (l == 1) {
my_behaviour = behaviors & 0x000F;
if (my_behaviour != 0) {
DBGPRINTF1("Server AMQP %d on port %d switch behaviour", srvr, port);
} else {
DBGPRINTF1("Server AMQP %d on port %d leaving", srvr, port);
if (fpout && fpout != stdout) {
fclose(fpout);
fpout = NULL;
}
exit(1);
}
}
}
fdc = accept(fds, NULL, NULL);
if (pipewrite > 0) nSent = write(pipewrite, "N", 1);
close(fds);
fds = -1;
/* this let the os understand that the port is closed */
usleep(1000 * wait_after_accept);
frame.type = AMQP_STARTING;
while (fdc > 0) {
nSent = 0;
ssize_t rd = 0;
if (nRead < 12) {
rd = read(fdc, wrkBuf + nRead, sizeof(wrkBuf) - nRead);
if (rd <= 0) {
DBGPRINTF1("Server AMQP %d on port %d disconnected\n", srvr, port);
close(fdc);
fdc = 0;
break;
} else {
nRead += (size_t)rd;
}
}
if (decode_frame_type(wrkBuf, &frame, nRead)) {
DBGPRINTF1("Server AMQP %d on port %d killed : bad protocol\n", srvr, port);
close(fdc);
fdc = 0;
break;
}
if (rd > 4) DBGPRINTF2("Server received : %zd\n", rd);
switch (frame.type) {
case AMQP_STARTING: /* starting handshake */
DBGPRINTF1("Server AMQP %d on port %d type %d connected\n", srvr, port, my_behaviour);
DBGPRINTF2("Server %d connection.start\n", srvr);
nSent = amqp_write(fdc, connection_start, sizeof(connection_start), frame.ch);
break;
case AMQP_FRAME_METHOD:
DBGPRINTF2("Server %d method : 0x%X\n", srvr, frame.method);
switch (frame.method) {
case AMQP_CONNECTION_START_OK_METHOD:
DBGPRINTF2("Server %d connection.tune\n", srvr);
nSent = amqp_write(fdc, connection_tune, sizeof(connection_tune), frame.ch);
break;
case AMQP_CONNECTION_TUNE_OK_METHOD:
DBGPRINTF2("Client %d connection.tune-ok\n", srvr);
nSent = 0;
break;
case AMQP_CONNECTION_OPEN_METHOD:
nSent = amqp_write(fdc, connection_open_ok, sizeof(connection_open_ok), frame.ch);
DBGPRINTF2("Server %d connection.open\n", srvr);
break;
case AMQP_CHANNEL_OPEN_METHOD:
nSent = amqp_write(fdc, channel_open_ok, sizeof(channel_open_ok), frame.ch);
DBGPRINTF2("Server %d channel.open\n", srvr);
if (my_behaviour == AMQP_BEHAVIOR_NOEXCH) {
close(fdc);
DBGPRINTF1("Server AMQP %d on port %d stopped\n", srvr, port);
fdc = 0;
frame.type = 0;
}
break;
case AMQP_EXCHANGE_DECLARE_METHOD:
if (my_behaviour == AMQP_BEHAVIOR_BADEXCH) {
nSent = amqp_write(fdc, channel_close_ok_on_badexch, sizeof(channel_close_ok_on_badexch),
frame.ch);
} else {
nSent = amqp_write(fdc, exchange_declare_ok, sizeof(exchange_declare_ok), frame.ch);
}
DBGPRINTF2("Server %d exchange.declare\n", srvr);
if (my_behaviour == AMQP_BEHAVIOR_DECEXCH) {
close(fdc);
DBGPRINTF1("Server AMQP %d on port %d stopped\n", srvr, port);
fdc = 0;
frame.type = 0;
}
break;
case AMQP_CHANNEL_CLOSE_METHOD:
nSent = amqp_write(fdc, channel_close_ok, sizeof(channel_close_ok), frame.ch);
DBGPRINTF2("Server %d channel.close\n", srvr);
break;
case AMQP_CONNECTION_CLOSE_METHOD:
nSent = amqp_write(fdc, connection_close_ok, sizeof(connection_close_ok), frame.ch);
DBGPRINTF2("Server %d connection.close\n", srvr);
break;
case AMQP_BASIC_PUBLISH_METHOD:
p = amqpFieldFprintf("Exchange:", frame.data + 2);
amqpFieldFprintf(", routing-key:", p);
break;
default:
nSent = 0;
}
break;
case AMQP_FRAME_HEADER:
DBGPRINTF2("Server %d HEADERS\n", srvr);
p = amqpFieldUint64(&body_ui64, frame.data);
bsize = (size_t)body_ui64;
p = amqpFieldUint16(&props_flags, p);
if (props_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
p = amqpFieldFprintf(", content-type:", p);
}
if (props_flags & AMQP_BASIC_HEADERS_FLAG) {
p = amqpFieldUint32(&props_header_size, p);
while (props_header_size) {
p = amqpHeaderFprintf(p, &props_header_size);
}
}
if (props_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
if (fpout) fprintf(fpout, ", delivery-mode:%s", (*p++) ? "transient" : "persistent");
}
if (props_flags & AMQP_BASIC_EXPIRATION_FLAG) {
p = amqpFieldFprintf(", expiration:", p);
}
if (props_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
if (fpout) fprintf(fpout, ", timestamp:OK");
p += sizeof(uint64_t);
}
if (props_flags & AMQP_BASIC_APP_ID_FLAG) {
amqpFieldFprintf(", app-id:", p);
}
if (fpout) fprintf(fpout, ", msg:");
break;
case AMQP_FRAME_BODY:
DBGPRINTF2("Server %d Body size left : %zu, received : %zu\n", srvr, bsize, frame.datalen);
bsize -= frame.datalen;
if (fpout) {
fprintf(fpout, "%.*s", (int)frame.datalen, frame.data);
if (frame.data[frame.datalen - 1] != '\n') fprintf(fpout, "\n");
fflush(fpout);
}
break;
default:
DBGPRINTF1("Server %d unsupported frame type %d\n", srvr, frame.type);
close(fdc);
fdc = 0;
frame.type = 0;
frame.framelen = 0;
} /* switch (frame.type) */
nRead -= frame.framelen;
if (nRead > 0) memmove(wrkBuf, wrkBuf + frame.framelen, nRead);
if (nSent < 0) {
close(fdc);
fdc = 0;
}
} /* while(fdc) */
DBGPRINTF2("Leaving thread %d\n", srvr);
}
int main(int argc, char *argv[]) {
int port[2], fds[2], i, opt, nb_port = 1;
int pipeS1toS2[2] = {-1, -1};
int pipeS2toS1[2] = {-1, -1};
int pipeRead[2], pipeWrite[2];
struct sockaddr_in srvAddr[2];
unsigned int addrLen = sizeof(struct sockaddr_in), len;
pid_t pid[2];
fpout = stdout;
while ((opt = getopt(argc, argv, "f:b:w:d")) != -1) {
switch (opt) {
case 'w':
wait_after_accept = atoi(optarg);
break;
case 'd':
debug = 2;
break;
case 'b':
server_behaviors = atoi(optarg);
break;
case 'f':
if (strcmp(optarg, "-")) {
outfile = optarg;
fpout = fopen(optarg, "w");
if (fpout == NULL) {
fprintf(stderr, "file %s could not be created\n", outfile);
exit(1);
}
}
break;
default:
fprintf(stderr, "invalid option '%c' or value missing - terminating...\n", opt);
usage();
break;
}
}
switch (server_behaviors) {
case 0:
behaviors = AMQP_BEHAVIOR_STANDARD;
nb_port = 1;
break;
case 1: /* two standard servers get message successfully */
behaviors = AMQP_BEHAVIOR_STANDARD;
nb_port = 2;
break;
case 2: /* 2 servers first server which disconnect after after open channel : no declare exchange */
behaviors = AMQP_BEHAVIOR_NOEXCH | AMQP_BEHAVIOR_STANDARD << 4;
nb_port = 2;
break;
case 3: /* 2 servers first server which disconnect after declare exchange*/
behaviors = AMQP_BEHAVIOR_DECEXCH | AMQP_BEHAVIOR_STANDARD << 4;
nb_port = 2;
break;
case 4: /* one server with bad exchange declare */
behaviors = AMQP_BEHAVIOR_BADEXCH;
nb_port = 1;
break;
default:
fprintf(stderr, "Invalid behavior");
exit(1);
}
gettimeofday(&dbgtv_base, NULL);
port[0] = port[1] = -1;
if (nb_port == 2) {
if (pipe(pipeS1toS2) == -1 || pipe(pipeS2toS1) == -1) {
fprintf(stderr, "Pipe failed !");
exit(1);
}
}
pipeRead[0] = pipeS2toS1[0];
pipeWrite[0] = pipeS1toS2[1];
pipeRead[1] = pipeS1toS2[0];
pipeWrite[1] = pipeS2toS1[1];
for (i = 0; i < nb_port; i++) {
fds[i] = socket(AF_INET, SOCK_STREAM, 0);
srvAddr[i].sin_family = AF_INET;
srvAddr[i].sin_addr.s_addr = INADDR_ANY;
srvAddr[i].sin_port = 0;
if (bind(fds[i], (struct sockaddr *)&srvAddr[i], addrLen) != 0) errout("bind", 1);
len = addrLen;
if (getsockname(fds[i], (struct sockaddr *)&srvAddr[i], &len) == -1) errout("bind", i + 1);
if ((port[i] = ntohs(srvAddr[i].sin_port)) <= 0) errout("get port", i + 1);
}
for (i = 0; i < nb_port; i++) {
if ((pid[i] = fork()) == -1) {
fprintf(stderr, "Fork failed !");
exit(1);
}
if (pid[i] == 0) {
/* this is the child */
if (fds[1 - i] > 0) close(fds[1 - i]);
amqp_srvr(port[i], i + 1, fds[i], pipeRead[i], pipeWrite[i]);
if (fpout && fpout != stdout) fclose(fpout);
DBGPRINTF2("%s\n", "Leaving server");
return 0;
}
}
if (nb_port == 2)
printf("export AMQPSRVRPID1=%ld AMQPSRVRPID2=%ld PORT_AMQP1=%d PORT_AMQP2=%d", (long)pid[0], (long)pid[1],
port[0], port[1]);
else
printf("export AMQPSRVRPID1=%ld PORT_AMQP1=%d", (long)pid[0], port[0]);
return 0;
}