#include #include #include #include #include #include #include struct endpoint { uint16_t id; skipstone_message_handler *handler; void *context; }; struct _skipstone_message_service { skipstone_map *endpoints; skipstone_queue *pending; }; skipstone_message_service *skipstone_message_service_new() { skipstone_message_service *service; if ((service = malloc(sizeof(*service))) == NULL) { goto error_malloc_service; } if ((service->endpoints = skipstone_map_new()) == NULL) { goto error_map_new_endpoints; } if ((service->pending = skipstone_queue_new()) == NULL) { goto error_queue_new_pending; } return service; error_queue_new_pending: skipstone_map_destroy(service->endpoints); error_map_new_endpoints: free(service); error_malloc_service: return NULL; } int skipstone_message_service_register(skipstone_message_service *service, uint16_t id, skipstone_message_handler *handler, void *context) { struct endpoint *endpoint; if ((endpoint = malloc(sizeof(*endpoint))) == NULL) { goto error_malloc_endpoint; } endpoint->id = id; endpoint->handler = handler; endpoint->context = context; return skipstone_map_set(service->endpoints, id, endpoint); error_malloc_endpoint: return -1; } int skipstone_message_service_deregister(skipstone_message_service *service, uint16_t id) { return skipstone_map_set((skipstone_map *)service, id, NULL); } int skipstone_message_service_queue(skipstone_message_service *service, void *buf, uint16_t size, uint16_t id) { skipstone_message_header *message; if ((message = malloc(sizeof(*message) + size)) == NULL) { goto error_malloc_message; } message->size = size; message->id = id; memcpy(message + 1, buf, size); return 0; error_malloc_message: return -1; } int skipstone_message_service_run(skipstone_message_service *service, skipstone_link *link) { void *buf; if ((buf = malloc(SKIPSTONE_MESSAGE_MAX_PAYLOAD)) == NULL) { goto error_malloc_buf; } while (1) { uint16_t size, id; skipstone_message_handler *handler; skipstone_message_header *message; if (skipstone_link_recv(link, buf, &size, &id) < 0) { goto error_io; } if ((handler = skipstone_map_get(service->endpoints, id)) != NULL) { if (handler(service, buf, size, id) < 0) { goto error_io; } } while (skipstone_queue_pop(service->pending, (void **)&message)) { if (skipstone_link_send(link, message + 1, message->size, message->id) < 0) { goto error_io; } } } return 0; error_io: free(buf); error_malloc_buf: return -1; }