Skip to content
Snippets Groups Projects
Commit 2c56970a9641 authored by jfp's avatar jfp
Browse files

First version which succesfully exchange a message

parent f71fa07f8835
No related branches found
No related tags found
1 merge request!2Topic/default/wip0903
...@@ -27,11 +27,7 @@ ...@@ -27,11 +27,7 @@
#define SOFTWAREID SOFTWARENM " UNKNOWN-" SOFTWAREVN #define SOFTWAREID SOFTWARENM " UNKNOWN-" SOFTWAREVN
#endif #endif
#ifndef __VAX
#pragma nomember_alignment
#endif
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <ctype.h> #include <ctype.h>
...@@ -34,7 +30,8 @@ ...@@ -34,7 +30,8 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <ctype.h> #include <ctype.h>
#include <assert.h>
#include <errno.h> #include <errno.h>
#include <time.h> #include <time.h>
#include <ssdef.h> #include <ssdef.h>
...@@ -72,6 +69,8 @@ ...@@ -72,6 +69,8 @@
amqp_socket_t *socket = NULL; amqp_socket_t *socket = NULL;
amqp_connection_state_t conn; amqp_connection_state_t conn;
amqp_bytes_t reply_to_queue; amqp_bytes_t reply_to_queue;
amqp_queue_declare_ok_t *amqp_reply_queue;
amqp_frame_t frame;
/* /*
* *
...@@ -111,6 +110,16 @@ ...@@ -111,6 +110,16 @@
"Logging in"); "Logging in");
amqp_channel_open(conn, 1); amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
// create private reply_to queue
amqp_reply_queue = amqp_queue_declare(
conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
reply_to_queue = amqp_bytes_malloc_dup(amqp_reply_queue->queue);
if (reply_to_queue.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return SS$_ABORT;
}
return 1; return 1;
} }
...@@ -114,7 +123,120 @@ ...@@ -114,7 +123,120 @@
return 1; return 1;
} }
/*
send the message
*/
amqp_send_message(const char *messagebody) {
int s;
/*
set properties
*/
amqp_basic_properties_t props;
memset(&props, 0, sizeof(props));
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG |
AMQP_BASIC_CORRELATION_ID_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
props.reply_to = amqp_bytes_malloc_dup(reply_to_queue);
if (props.reply_to.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return 44;
}
props.correlation_id = amqp_cstring_bytes("1");
/*
publish
*/
die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(amqp_exchange),
amqp_cstring_bytes(amqp_routingkey), 0, 0,
&props, amqp_cstring_bytes(messagebody)),
"Publishing");
amqp_bytes_free(props.reply_to);
/*
wait an answer
*/
{
amqp_basic_consume(conn, 1, reply_to_queue, amqp_empty_bytes, 0, 1, 0,
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
// amqp_bytes_free(reply_to_queue);
{
int result;
amqp_basic_deliver_t *d;
amqp_basic_properties_t *p;
size_t body_target;
size_t body_received;
for (;;) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0) {
break;
}
if (frame.frame_type != AMQP_FRAME_METHOD) {
continue;
}
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
continue;
}
d = (amqp_basic_deliver_t *)frame.payload.method.decoded;
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0) {
break;
}
if (frame.frame_type != AMQP_FRAME_HEADER) {
fprintf(stderr, "Expected header!");
abort();
}
p = (amqp_basic_properties_t *)frame.payload.properties.decoded;
if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n", (int)p->content_type.len,
(char *)p->content_type.bytes);
}
body_target = (size_t)frame.payload.properties.body_size;
body_received = 0;
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0) {
break;
}
if (frame.frame_type != AMQP_FRAME_BODY) {
fprintf(stderr, "Expected body!");
abort();
}
body_received += frame.payload.body_fragment.len;
assert(body_received <= body_target);
// amqp_dump(frame.payload.body_fragment.bytes,
// frame.payload.body_fragment.len);
}
if (body_received != body_target) {
/* Can only happen when amqp_simple_wait_frame returns <= 0 */
/* We break here to close the connection */
break;
}
/* everything was fine, we can quit now because we received the reply */
break;
}
}
}
return 1;
}
int rmqplus() { int rmqplus() {
cJSON *cjson_root, *cjson_in, *cjson_out, *cjson_header, *cjson_obj; cJSON *cjson_root, *cjson_in, *cjson_out, *cjson_header, *cjson_obj;
char *cptr, *cname, *cvalue; char *cptr, *cname, *cvalue;
...@@ -117,7 +239,8 @@ ...@@ -117,7 +239,8 @@
int rmqplus() { int rmqplus() {
cJSON *cjson_root, *cjson_in, *cjson_out, *cjson_header, *cjson_obj; cJSON *cjson_root, *cjson_in, *cjson_out, *cjson_header, *cjson_obj;
char *cptr, *cname, *cvalue; char *cptr, *cname, *cvalue;
int s;
cjson_root = cjson_create_root(); cjson_root = cjson_create_root();
cjson_in = cJSON_GetObjectItem(cjson_root, "in"); cjson_in = cJSON_GetObjectItem(cjson_root, "in");
...@@ -154,8 +277,10 @@ ...@@ -154,8 +277,10 @@
} }
} }
} }
fputs(cJSON_PrintUnformatted(cjson_root), stdout); s = amqp_send_message(cJSON_PrintUnformatted(cjson_root));
fputs("\n", stdout); // fputs(cJSON_PrintUnformatted(cjson_root), stdout);
printf("%.*s\n", frame.payload.body_fragment.len,
(char *)frame.payload.body_fragment.bytes);
cJSON_Delete(cjson_root); cJSON_Delete(cjson_root);
return (1); return (1);
} }
...@@ -166,6 +291,7 @@ ...@@ -166,6 +291,7 @@
main() { main() {
char* cptr; char* cptr;
int s;
if ((stdout = freopen("SYS$OUTPUT", "w", stdout, "ctx=bin")) == NULL) if ((stdout = freopen("SYS$OUTPUT", "w", stdout, "ctx=bin")) == NULL)
exit(vaxc$errno); exit(vaxc$errno);
...@@ -178,6 +304,12 @@ ...@@ -178,6 +304,12 @@
if (cptr = getenv("RMQPLUS_VHOST")) if (cptr = getenv("RMQPLUS_VHOST"))
amqp_vhost = cptr; amqp_vhost = cptr;
if (cptr = getenv("RMQPLUS_USERNAME"))
amqp_username = cptr;
if (cptr = getenv("RMQPLUS_PASSWORD"))
amqp_password = cptr;
else else
amqp_vhost = "/"; amqp_vhost = "/";
...@@ -189,5 +321,8 @@ ...@@ -189,5 +321,8 @@
if (!(amqp_exchange = getenv("RMQPLUS_EXCHANGE"))) if (!(amqp_exchange = getenv("RMQPLUS_EXCHANGE")))
amqp_exchange = ""; amqp_exchange = "";
if (!(amqp_routingkey = getenv("RMQPLUS_ROUTING_KEY")))
amqp_routingkey = "";
CgiLibEnvironmentSetDebug(Debug); CgiLibEnvironmentSetDebug(Debug);
...@@ -192,5 +327,7 @@ ...@@ -192,5 +327,7 @@
CgiLibEnvironmentSetDebug(Debug); CgiLibEnvironmentSetDebug(Debug);
amqp_init();
time(&UnixTime); time(&UnixTime);
UnixTmPtr = localtime(&UnixTime); UnixTmPtr = localtime(&UnixTime);
if (!strftime(FirstUsedDateTime, sizeof(FirstUsedDateTime), if (!strftime(FirstUsedDateTime, sizeof(FirstUsedDateTime),
...@@ -211,4 +348,4 @@ ...@@ -211,4 +348,4 @@
} }
exit(SS$_NORMAL); exit(SS$_NORMAL);
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment