skipstone/src/message.c

133 lines
3 KiB
C
Raw Normal View History

2017-11-23 17:22:36 +00:00
#include <stdlib.h>
2017-11-23 18:02:38 +00:00
#include <string.h>
2017-11-23 17:22:36 +00:00
#include <skipstone/link.h>
#include <skipstone/map.h>
#include <skipstone/queue.h>
#include <skipstone/message.h>
struct endpoint {
uint16_t id;
skipstone_message_handler *handler;
void *context;
};
struct _skipstone_message_service {
skipstone_map *endpoints;
skipstone_queue *pending;
};
skipstone_message_service *skipstone_message_service_new() {
skipstone_message_service *service;
if ((service = malloc(sizeof(*service))) == NULL) {
goto error_malloc_service;
}
if ((service->endpoints = skipstone_map_new()) == NULL) {
goto error_map_new_endpoints;
}
if ((service->pending = skipstone_queue_new()) == NULL) {
goto error_queue_new_pending;
}
return service;
error_queue_new_pending:
skipstone_map_destroy(service->endpoints);
error_map_new_endpoints:
free(service);
error_malloc_service:
return NULL;
2017-11-23 17:22:36 +00:00
}
int skipstone_message_service_register(skipstone_message_service *service,
uint16_t id, skipstone_message_handler *handler, void *context) {
struct endpoint *endpoint;
if ((endpoint = malloc(sizeof(*endpoint))) == NULL) {
goto error_malloc_endpoint;
}
endpoint->id = id;
endpoint->handler = handler;
endpoint->context = context;
return skipstone_map_set(service->endpoints, id, endpoint);
error_malloc_endpoint:
return -1;
}
int skipstone_message_service_deregister(skipstone_message_service *service, uint16_t id) {
return skipstone_map_set((skipstone_map *)service, id, NULL);
}
2017-11-23 18:02:38 +00:00
int skipstone_message_service_queue(skipstone_message_service *service,
void *buf, uint16_t size, uint16_t id) {
skipstone_message_header *message;
if ((message = malloc(sizeof(*message) + size)) == NULL) {
goto error_malloc_message;
}
message->size = size;
message->id = id;
memcpy(message + 1, buf, size);
2017-11-24 12:18:02 -06:00
if (skipstone_queue_add(service->pending, message) < 0) {
goto error_queue_add_pending;
2017-11-24 02:27:57 +00:00
}
2017-11-23 18:02:38 +00:00
return 0;
2017-11-24 12:18:02 -06:00
error_queue_add_pending:
2017-11-24 02:27:57 +00:00
free(message);
2017-11-23 18:02:38 +00:00
error_malloc_message:
return -1;
}
2017-11-23 17:22:36 +00:00
int skipstone_message_service_run(skipstone_message_service *service,
skipstone_link *link) {
void *buf;
if ((buf = malloc(SKIPSTONE_MESSAGE_MAX_PAYLOAD)) == NULL) {
goto error_malloc_buf;
}
2017-11-23 18:02:38 +00:00
while (1) {
skipstone_message_header *message;
2017-11-24 02:27:57 +00:00
struct endpoint *endpoint;
uint16_t size, id;
2017-11-23 18:02:38 +00:00
if (skipstone_link_recv(link, buf, &size, &id) < 0) {
goto error_io;
}
2017-11-24 02:27:57 +00:00
if ((endpoint = skipstone_map_get(service->endpoints, id)) != NULL) {
if (endpoint->handler(service, buf, size, id) < 0) {
2017-11-23 18:02:38 +00:00
goto error_io;
}
}
2017-11-24 12:18:02 -06:00
while (skipstone_queue_remove(service->pending, (void **)&message)) {
2017-11-23 18:02:38 +00:00
if (skipstone_link_send(link, message + 1, message->size, message->id) < 0) {
goto error_io;
}
}
2017-11-23 17:22:36 +00:00
}
return 0;
2017-11-23 18:02:38 +00:00
error_io:
free(buf);
2017-11-23 17:22:36 +00:00
error_malloc_buf:
return -1;
}