skipstone/src/message.c

341 lines
7.3 KiB
C
Raw Normal View History

2017-11-23 17:22:36 +00:00
#include <stdlib.h>
2017-11-23 18:02:38 +00:00
#include <string.h>
2017-11-23 17:22:36 +00:00
#include <skipstone/link.h>
#include <skipstone/map.h>
#include <skipstone/queue.h>
#include <skipstone/message.h>
2017-11-24 15:25:28 -06:00
struct part {
2017-11-24 23:38:23 -06:00
uint8_t size;
2017-11-24 15:25:28 -06:00
};
struct _skipstone_message {
skipstone_queue *parts;
2017-11-24 23:49:00 -06:00
uint16_t size;
2017-11-24 15:25:28 -06:00
};
2017-11-23 17:22:36 +00:00
struct endpoint {
skipstone_message_handler *handler;
void *context;
2017-11-24 15:25:28 -06:00
uint16_t id;
2017-11-23 17:22:36 +00:00
};
struct _skipstone_message_service {
skipstone_map *endpoints;
skipstone_queue *pending;
void *buf;
2017-11-23 17:22:36 +00:00
};
2017-11-24 23:49:00 -06:00
skipstone_message *skipstone_message_new() {
2017-11-24 15:25:28 -06:00
skipstone_message *message;
if ((message = malloc(sizeof(*message))) == NULL) {
goto error_malloc_message;
}
if ((message->parts = skipstone_queue_new()) == NULL) {
goto error_queue_new_parts;
}
message->size = 0;
return message;
error_queue_new_parts:
free(message);
error_malloc_message:
return NULL;
}
2017-11-26 16:22:00 -06:00
void skipstone_message_destroy(skipstone_message *message) {
void *part;
while (skipstone_queue_remove(message->parts, &part)) {
free(part);
}
skipstone_queue_destroy(message->parts);
free(message);
return;
}
static int append_part(skipstone_message *message, void *buf, uint8_t size) {
2017-11-24 15:25:28 -06:00
struct part *part;
if ((part = malloc(sizeof(*part) + size)) == NULL) {
goto error_malloc_part;
}
part->size = size;
2017-11-24 15:25:28 -06:00
2017-11-24 23:38:23 -06:00
memcpy(part + 1, buf, size);
2017-11-24 15:25:28 -06:00
if (skipstone_queue_add(message->parts, part) < 0) {
goto error_queue_add_parts;
}
return 0;
error_queue_add_parts:
free(part);
error_malloc_part:
return -1;
}
2017-11-24 23:38:23 -06:00
int skipstone_message_append_string(skipstone_message *message,
char *string, uint8_t size) {
struct part *part;
uint8_t bodysz = sizeof(size) + size;
if ((part = malloc(sizeof(*part) + bodysz)) == NULL) {
goto error_malloc_part;
}
part->size = bodysz;
memcpy(part + 1, &size, sizeof(size));
memcpy(((uint8_t *)(part + 1)) + sizeof(size), string, size);
if (skipstone_queue_add(message->parts, part) < 0) {
goto error_queue_add_part;
2017-11-24 23:49:00 -06:00
}
message->size += bodysz;
2017-11-24 23:49:00 -06:00
return 0;
error_queue_add_part:
free(part);
error_malloc_part:
return -1;
2017-11-24 15:25:28 -06:00
}
2017-11-26 16:22:00 -06:00
uint16_t skipstone_message_size(skipstone_message *message) {
return message->size;
}
2017-11-24 23:38:23 -06:00
int skipstone_message_append_uint8(skipstone_message *message, uint8_t value) {
if (append_part(message, &value, sizeof(value)) < 0) {
2017-11-24 23:49:00 -06:00
return -1;
}
message->size += sizeof(value);
return 0;
2017-11-24 23:38:23 -06:00
}
2017-11-24 15:25:28 -06:00
2017-11-24 23:38:23 -06:00
int skipstone_message_append_uint16(skipstone_message *message, uint16_t value) {
if (append_part(message, &value, sizeof(value)) < 0) {
2017-11-24 23:49:00 -06:00
return -1;
}
message->size += sizeof(value);
return 0;
2017-11-24 15:25:28 -06:00
}
2017-11-26 16:22:00 -06:00
int skipstone_message_append_uint32(skipstone_message *message, uint32_t value) {
if (append_part(message, &value, sizeof(value)) < 0) {
return -1;
}
message->size += sizeof(value);
return 0;
}
int skipstone_message_pack(skipstone_message *message, void *buf) {
2017-11-24 15:25:28 -06:00
size_t offset = 0;
struct part *part;
if (message->size > SKIPSTONE_MESSAGE_MAX_PAYLOAD) {
return -1;
}
2017-11-24 15:25:28 -06:00
while (skipstone_queue_remove(message->parts, (void **)&part)) {
memcpy(((uint8_t *)buf) + offset, part + 1, part->size);
2017-11-24 15:25:28 -06:00
offset += part->size;
2017-11-24 15:25:28 -06:00
}
return 0;
}
2017-11-23 17:22:36 +00:00
skipstone_message_service *skipstone_message_service_new() {
skipstone_message_service *service;
if ((service = malloc(sizeof(*service))) == NULL) {
goto error_malloc_service;
}
if ((service->buf = malloc(SKIPSTONE_MESSAGE_MAX_PAYLOAD)) == NULL) {
goto error_malloc_buf;
}
if ((service->endpoints = skipstone_map_new()) == NULL) {
goto error_map_new_endpoints;
}
if ((service->pending = skipstone_queue_new()) == NULL) {
goto error_queue_new_pending;
}
return service;
error_queue_new_pending:
skipstone_map_destroy(service->endpoints, free);
error_map_new_endpoints:
free(service->buf);
error_malloc_buf:
free(service);
error_malloc_service:
return NULL;
2017-11-23 17:22:36 +00:00
}
void skipstone_message_service_destroy(skipstone_message_service *service) {
skipstone_message *message;
skipstone_map_destroy(service->endpoints, free);
while (skipstone_queue_remove(service->pending, (void **)&message)) {
skipstone_message_destroy(message);
}
skipstone_queue_destroy(service->pending);
free(service->buf);
free(service);
return;
}
2017-11-23 17:22:36 +00:00
int skipstone_message_service_register(skipstone_message_service *service,
uint16_t id, skipstone_message_handler *handler, void *context) {
struct endpoint *endpoint;
if ((endpoint = malloc(sizeof(*endpoint))) == NULL) {
goto error_malloc_endpoint;
}
endpoint->id = id;
endpoint->handler = handler;
endpoint->context = context;
return skipstone_map_set(service->endpoints, id, endpoint);
error_malloc_endpoint:
return -1;
}
int skipstone_message_service_deregister(skipstone_message_service *service, uint16_t id) {
return skipstone_map_set((skipstone_map *)service, id, NULL);
}
2017-11-26 16:22:00 -06:00
int skipstone_message_service_queue_packed(skipstone_message_service *service,
2017-11-23 18:02:38 +00:00
void *buf, uint16_t size, uint16_t id) {
skipstone_message_header *message;
if ((message = malloc(sizeof(*message) + size)) == NULL) {
goto error_malloc_message;
}
message->size = size;
message->id = id;
memcpy(message + 1, buf, size);
2017-11-24 12:18:02 -06:00
if (skipstone_queue_add(service->pending, message) < 0) {
goto error_queue_add_pending;
2017-11-24 02:27:57 +00:00
}
2017-11-23 18:02:38 +00:00
return 0;
2017-11-24 12:18:02 -06:00
error_queue_add_pending:
2017-11-24 02:27:57 +00:00
free(message);
2017-11-23 18:02:38 +00:00
error_malloc_message:
return -1;
}
2017-11-26 16:22:00 -06:00
int skipstone_message_service_queue(skipstone_message_service *service,
skipstone_message *message, uint16_t id) {
skipstone_message_header *packed;
if ((packed = malloc(sizeof(*packed) + message->size)) == NULL) {
goto error_malloc_packed;
}
packed->size = message->size;
packed->id = id;
if (skipstone_message_pack(message, packed + 1) < 0) {
goto error_pack;
}
if (skipstone_queue_add(service->pending, packed) < 0) {
goto error_queue_add_pending;
}
return 0;
error_queue_add_pending:
error_pack:
free(message);
error_malloc_packed:
return -1;
}
int skipstone_message_service_next_event(skipstone_message_service *service,
2017-11-23 17:22:36 +00:00
skipstone_link *link) {
skipstone_message_header *message;
struct endpoint *endpoint;
uint16_t size, id;
2017-11-23 17:22:36 +00:00
if (skipstone_link_recv(link, service->buf, &size, &id) < 0) {
goto error_io;
2017-11-23 17:22:36 +00:00
}
if ((endpoint = skipstone_map_get(service->endpoints, id)) != NULL) {
if (endpoint->handler(service, service->buf, size, id, endpoint->context) < 0) {
2017-11-23 18:02:38 +00:00
goto error_io;
}
}
2017-11-23 18:02:38 +00:00
while (skipstone_queue_remove(service->pending, (void **)&message)) {
if (skipstone_link_send(link, message + 1, message->size, message->id) < 0) {
goto error_io;
2017-11-23 18:02:38 +00:00
}
2017-11-23 17:22:36 +00:00
}
return 0;
2017-11-23 18:02:38 +00:00
error_io:
return -1;
}
2017-11-23 18:02:38 +00:00
int skipstone_message_service_run(skipstone_message_service *service,
skipstone_link *link) {
while (1) {
if (skipstone_message_service_next_event(service, link) < 0) {
goto error_service_next_event;
}
}
return 0;
error_service_next_event:
2017-11-23 17:22:36 +00:00
return -1;
}