Skip to content
Snippets Groups Projects
Commit e901fbaaa013 authored by Jean-Francois Pieronne's avatar Jean-Francois Pieronne
Browse files

Update examples, add missing stdint.h

parent d9bae26ee3a1
No related branches found
No related tags found
1 merge request!5Update examples, add missing stdint.h
......@@ -18,3 +18,18 @@
CJSONSHR32/share
RABBITMQSHR32/share
$
$ set default [.examples]
$ cc /float=ieee/ieee=denorm/name=(as_is,short) -
/include=("../", LIBCJSON, LIBRABBITMQ) -
rmq_rpc.c
$ cc /float=ieee/ieee=denorm/name=(as_is,short) -
/include=("../", LIBCJSON, LIBRABBITMQ) -
rpc_server_c.c
$ link rpc_server_c.obj, sys$input:/opt
rmq_rpc.obj
[-]utils.obj
CJSONSHR32/share
RABBITMQSHR32/share
$
$ set default [-]
$
// indent -br -ce -nut -i4 -npcs rmq_rpc.c
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <descrip.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <assert.h>
#include "utils.h"
#include "rmq_rpc.h"
static amqp_socket_t *sock = NULL;
static amqp_connection_state_t conn;
static amqp_rpc_reply_t res;
static amqp_envelope_t envelope;
int
rmq_init_server_c(char const *hostname, int port, char const *username,
char const *password, char const *vhost,
char const *queuename)
{
int status;
conn = amqp_new_connection();
if (!conn) {
fprintf(stderr, "failed to allocate connection object\n");
return 1;
}
sock = amqp_tcp_socket_new(conn);
if (!sock) {
die("creating TCP socket");
}
status = amqp_socket_open(sock, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(
conn,
vhost,
AMQP_DEFAULT_MAX_CHANNELS,
AMQP_DEFAULT_FRAME_SIZE,
AMQP_DEFAULT_HEARTBEAT,
AMQP_SASL_METHOD_PLAIN,
username,
password),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_basic_consume(
conn,
1,
amqp_cstring_bytes(queuename),
amqp_empty_bytes,
0,
0,
0,
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
return (0);
}
int
rmq_init_server(DSC *hostDSC, int port, DSC *userDSC, DSC * pwdDSC,
DSC *vhostDSC, DSC *queueDSC)
{
char hostname[hostDSC->dsc$w_length + 1];
char username[userDSC->dsc$w_length + 1];
char password[pwdDSC->dsc$w_length + 1];
//char vhost[vhostDSC->dsc$w_length + 1];
char *vhost;
char queuename[queueDSC->dsc$w_length + 1];
strncpy(hostname, hostDSC->dsc$a_pointer, hostDSC->dsc$w_length);
hostname[hostDSC->dsc$w_length] = 0;
strncpy(username, userDSC->dsc$a_pointer, userDSC->dsc$w_length);
username[userDSC->dsc$w_length] = 0;
strncpy(password, pwdDSC->dsc$a_pointer, pwdDSC->dsc$w_length);
password[pwdDSC->dsc$w_length] = 0;
if (vhostDSC) {
vhost = __ALLOCA(vhostDSC->dsc$w_length + 1);
strncpy(vhost, vhostDSC->dsc$a_pointer, vhostDSC->dsc$w_length);
vhost[vhostDSC->dsc$w_length] = 0;
}
strncpy(queuename, queueDSC->dsc$a_pointer, queueDSC->dsc$w_length);
queuename[queueDSC->dsc$w_length] = 0;
return (rmq_init_server_c(
hostname,
port,
username,
password,
vhostDSC ? vhost : AMQP_DEFAULT_VHOST,
queuename));
}
int
rmq_rpc_in_c(char **body_bytes, int *body_len)
{
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
return -1;
}
/*
* Once you've consumed a message and processed it, your code should
* publish to the default exchange (amqp_empty_bytes), with a routing key
* that is specified in the reply_to header in the request message.
* Additionally your code should set the correlation_id header to be the
* same as what is in the request message.
*/
#if DEBUG
printf("||Delivery %u,%d exchange %s %d routingkey %s consumer_tag %s|| \n",
(unsigned)envelope.delivery_tag,
(int)envelope.exchange.len,
(char *)envelope.exchange.bytes,
(int)envelope.routing_key.len,
(char *)envelope.routing_key.bytes,
(char *)envelope.message.properties.reply_to.bytes);
printf("----\n");
amqp_dump(envelope.message.body.bytes, envelope.message.body.len);
#endif
*body_bytes = envelope.message.body.bytes;
*body_len = envelope.message.body.len;
return 0;
}
int
rmq_rpc_in(DSC *message_inDSC)
{
int message_len;
char *message_in;
int status;
status = rmq_rpc_in_c(&message_in, &message_len);
if (status)
return (status);
if (message_len > message_inDSC->dsc$w_length)
return -2;
strncpy(message_inDSC->dsc$a_pointer, message_in, message_len);
return (message_len);
}
int
rmq_rpc_reply_c(char const *creply, char const *header)
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG |
AMQP_BASIC_CORRELATION_ID_FLAG;
props.content_type = amqp_cstring_bytes(header);
props.delivery_mode = 2;
//persistent delivery mode
// props.reply_to = amqp_cstring_bytes("");
/*
* Additionally your code should set the correlation_id header to be the
* same as what is in the request message.
*/
props.correlation_id = amqp_cstring_bytes("1");
die_on_error(amqp_basic_publish(
conn,
1,
// your code should publish to the default
// exchange(amqp_empty_bytes)
amqp_empty_bytes,
// with a routing key that is specified in the
// reply_to header in the request message.
amqp_cstring_bytes(
(char *)envelope.message.properties.reply_to.bytes),
0,
0,
&props,
amqp_cstring_bytes(creply)),
"Publishing");
//sleep(30);
die_on_error(amqp_basic_ack(
conn,
1,
(unsigned)envelope.delivery_tag,
0),
"Sending Ack");
amqp_destroy_envelope(&envelope);
return 0;
}
int
rmq_rpc_reply(DSC *replyDSC)
{
char creply[replyDSC->dsc$w_length + 1];
strncpy(creply, replyDSC->dsc$a_pointer, replyDSC->dsc$w_length);
creply[replyDSC->dsc$w_length] = 0;
return (rmq_rpc_reply_c(creply, "text/plain"));
}
int
rmq_shut_server()
{
die_on_amqp_error(amqp_channel_close(
conn,
1,
AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}
#include <descrip.h>
typedef struct dsc$descriptor_s DSC;
extern int rmq_init_server_c(char const *hostname, int port,
char const *username, char const *password,
char const *vhost, char const *queuename);
extern int rmq_init_server(DSC *hostDSC, int port, DSC *userDSC, DSC * pwdDSC,
DSC *vhostDSC, DSC *queueDSC);
extern int rmq_rpc_in_c(char **body_bytes, int *body_len);
extern int rmq_rpc_in(DSC *message_inDSC);
extern int rmq_rpc_reply_c(char const *creply, char const *header);
extern int rmq_rpc_reply(DSC *replyDSC);
extern int rmq_shut_server();
// indent -br -ce -nut -i4 -npcs rmq_rpc.c
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <descrip.h>
#include <stdint.h>
#include <string.h>
#include <cJSON.h>
//#include "utils.h"
#include "rmq_rpc.h"
char const *amqp_hostname;
char const *amqp_vhost;
char const *amqp_username;
char const *amqp_password;
int amqp_port;
char const *amqp_queue;
int main() {
char *cptr;
int s;
char *body_bytes;
int body_len;
if (!(amqp_hostname = getenv("RMQ_HOSTNAME")))
amqp_hostname = "localhost";
if (cptr = getenv("RMQ_VHOST"))
amqp_vhost = cptr;
if (cptr = getenv("RMQ_USERNAME"))
amqp_username = cptr;
if (cptr = getenv("RMQ_PASSWORD"))
amqp_password = cptr;
else
amqp_vhost = "/";
if (cptr = getenv("RMQ_PORT"))
amqp_port = atoi(cptr);
else
amqp_port = 5672;
if (!(amqp_queue = getenv("RMQ_QUEUE")))
amqp_queue = "";
s = rmq_init_server_c(amqp_hostname, amqp_port, amqp_username,
amqp_password, amqp_vhost, amqp_queue);
/*
for (;;) {
s = rmq_rpc_in_c(&body_bytes, &body_len);
{
cJSON *root;
char cbody[body_len + 1];
strncpy(cbody, body_bytes, body_len);
cbody[body_len] = 0;
printf("%s\n", cbody);
s = rmq_rpc_reply_c(cbody, "application/json");
}
*/
for (;;) {
cJSON *cjson_root;
cJSON *cjson_in;
cJSON *cjson_out;
cJSON *cjson_stra;
cJSON *cjson_strb;
char *stra;
char *strb;
s = rmq_rpc_in_c(&body_bytes, &body_len);
// printf("%.*s\n", body_len, body_bytes);
cjson_root = cJSON_ParseWithLengthOpts(body_bytes, body_len, 0, 0);
cjson_in = cJSON_GetObjectItem(cjson_root, "in");
if (cjson_in == NULL) {
cJSON_Delete(cjson_root);
s = rmq_rpc_reply_c("{\"status\":0}", "application/json");
continue;
}
cjson_out = cJSON_GetObjectItem(cjson_root, "out");
if (cjson_out == NULL) {
cJSON_Delete(cjson_root);
s = rmq_rpc_reply_c("{\"status\":0}", "application/json");
continue;
}
cjson_stra = cJSON_GetObjectItem(cjson_in, "FORM_FIRSTFIELD");
if (cjson_stra == NULL) {
cJSON_Delete(cjson_root);
s = rmq_rpc_reply_c("{\"status\":0}", "application/json");
continue;
}
stra = cJSON_GetStringValue(cjson_stra);
cjson_strb = cJSON_GetObjectItem(cjson_in, "FORM_SECONDFIELD");
if (cjson_strb == NULL) {
cJSON_Delete(cjson_root);
s = rmq_rpc_reply_c("{\"status\":0}", "application/json");
continue;
}
strb = cJSON_GetStringValue(cjson_strb);
cJSON_AddNumberToObject(cjson_out, "status", 1);
{
char strr[strlen(stra) + strlen(strb) + 2];
char *str;
sprintf(strr, "%s %s", stra, strb);
cJSON_AddStringToObject(cjson_out, "value", strr);
str = cJSON_PrintUnformatted(cjson_out);
// puts(str);
s = rmq_rpc_reply_c(str, "application/json");
free(str);
}
cJSON_Delete(cjson_root);
}
}
stdint.h 0 → 100644
#include <inttypes.h>
#include <unistd.h>
#include <limits.h>
#ifndef SIZE_MAX // [
#define SIZE_MAX __UINT32_MAX
#endif // SIZE_MAX ]
#define UINT8_MAX UCHAR_MAX
#define UINT16_MAX __UINT16_MAX
#define UINT32_MAX __UINT32_MAX
#define UINT64_MAX __UINT64_MAX
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment