Commit f69ddcc5 authored by legoc's avatar legoc
Browse files

Beginning with publisher topics in bytes

parent 407047d7
......@@ -96,6 +96,12 @@ struct StringId {
static std::string from(const std::string& id, const std::string& name);
};
struct TopicId {
static std::string from(uint32_t id);
static std::string from(uint32_t id, const std::string& name);
};
}
std::ostream& operator<<(std::ostream& os, const cameo::Endpoint& endpoint);
......
......@@ -46,6 +46,17 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
return std::unique_ptr<Event>(nullptr);
}
std::cout << "Received " << message << " " << message.size() << std::endl;
for (size_t i = 0; i < message.size(); ++i) {
std::cout << "Received " << (int)message[i] << std::endl;
}
std::cout << "STATUS " << std::endl;
for (size_t i = 0; i < message::Event::STATUS.size(); ++i) {
std::cout << " " << (int)message::Event::STATUS[i] << std::endl;
}
if (message == message::Event::STATUS) {
message = m_impl->receive();
......
......@@ -19,6 +19,29 @@
namespace cameo {
namespace message {
namespace Event {
const uint32_t SYNC_value = 0x00000001;
const uint32_t CANCEL_value = 0x00000002;
const uint32_t STREAM_value = 0x00000003;
const uint32_t ENDSTREAM_temp_value = 0x00000004;
const uint32_t STATUS_value = 0x00000005;
const uint32_t RESULT_value = 0x00000006;
const uint32_t KEYVALUE_value = 0x00000007;
const std::string SYNC = TopicId::from(SYNC_value);
const std::string CANCEL = TopicId::from(CANCEL_value);
const std::string STREAM = TopicId::from(STREAM_value);
const std::string ENDSTREAM_temp = TopicId::from(ENDSTREAM_temp_value);
const std::string STATUS = TopicId::from(STATUS_value);
const std::string RESULT = TopicId::from(RESULT_value);
const std::string KEYVALUE = TopicId::from(KEYVALUE_value);
}
}
std::string createSyncRequest() {
json::StringObject request;
......
......@@ -17,6 +17,7 @@
#ifndef CAMEO_REQUESTS_H_
#define CAMEO_REQUESTS_H_
#include "Strings.h"
#include <string>
#include <vector>
......@@ -62,13 +63,30 @@ namespace message {
const int STREAM_END = 40;
namespace Event {
constexpr const char* SYNC = "sync";
constexpr const char* CANCEL = "cancel";
constexpr const char* STREAM = "stream";
constexpr const char* ENDSTREAM_temp = "endstream";
constexpr const char* STATUS = "status";
constexpr const char* RESULT = "result";
constexpr const char* KEYVALUE = "keyvalue";
// constexpr const char* SYNC = "sync";
// constexpr const char* CANCEL = "cancel";
// constexpr const char* STREAM = "stream";
// constexpr const char* ENDSTREAM_temp = "endstream";
// constexpr const char* STATUS = "status";
// constexpr const char* RESULT = "result";
// constexpr const char* KEYVALUE = "keyvalue";
extern const uint32_t SYNC_value;
extern const uint32_t CANCEL_value;
extern const uint32_t STREAM_value;
extern const uint32_t ENDSTREAM_temp_value;
extern const uint32_t STATUS_value;
extern const uint32_t RESULT_value;
extern const uint32_t KEYVALUE_value;
extern const std::string SYNC;
extern const std::string CANCEL;
extern const std::string STREAM;
extern const std::string ENDSTREAM_temp;
extern const std::string STATUS;
extern const std::string RESULT;
extern const std::string KEYVALUE;
}
namespace ApplicationIdentity {
......
......@@ -228,6 +228,38 @@ std::string StringId::from(const std::string& id, const std::string& name) {
return id + ":" + name;
}
std::string TopicId::from(uint32_t id) {
std::string result;
size_t idSize = sizeof(id);
result.resize(idSize);
const char *dest = result.c_str();
memcpy(static_cast<void *>(&id), dest, idSize);
return result;
}
std::string TopicId::from(uint32_t id, const std::string& name) {
std::string result;
size_t idSize = sizeof(id);
result.resize(idSize + name.size());
char *dest = result.data();
memcpy(dest, static_cast<void *>(&id), idSize);
dest += idSize;
memcpy(dest, name.data(), name.length());
return result;
}
std::ostream& operator<<(std::ostream& os, const cameo::Endpoint& endpoint) {
os << endpoint.toString();
......
......@@ -34,6 +34,8 @@ EventStreamSocketZmq::~EventStreamSocketZmq() {
void EventStreamSocketZmq::init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket) {
std::cout << "EventStreamSocketZmq::init" << std::endl;
m_context = dynamic_cast<ContextZmq *>(context);
std::stringstream cancelEndpoint;
......@@ -47,15 +49,21 @@ void EventStreamSocketZmq::init(Context * context, const Endpoint& endpoint, Req
m_socket = std::unique_ptr<zmq::socket_t>(new zmq::socket_t(m_context->getContext(), zmq::socket_type::sub));
vector<string> streamList;
streamList.push_back(message::Event::STATUS);
streamList.push_back(message::Event::RESULT);
streamList.push_back(message::Event::KEYVALUE);
streamList.push_back(message::Event::CANCEL);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
m_socket->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
// vector<string> streamList;
// streamList.push_back(message::Event::STATUS);
// streamList.push_back(message::Event::RESULT);
// streamList.push_back(message::Event::KEYVALUE);
// streamList.push_back(message::Event::CANCEL);
//
// for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
// m_socket->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
// }
// m_socket->setsockopt(ZMQ_SUBSCRIBE, &message::Event::STATUS, 4);
// m_socket->setsockopt(ZMQ_SUBSCRIBE, &message::Event::RESULT, 4);
// m_socket->setsockopt(ZMQ_SUBSCRIBE, &message::Event::KEYVALUE, 4);
// m_socket->setsockopt(ZMQ_SUBSCRIBE, &message::Event::CANCEL, 4);
m_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
m_socket->connect(endpoint.toString().c_str());
m_socket->connect(cancelEndpoint.str().c_str());
......@@ -68,6 +76,8 @@ void EventStreamSocketZmq::init(Context * context, const Endpoint& endpoint, Req
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
std::cout << "EventStreamSocketZmq::init polling" << std::endl;
while (true) {
try {
requestSocket->requestJSON(createSyncRequest());
......@@ -82,6 +92,8 @@ void EventStreamSocketZmq::init(Context * context, const Endpoint& endpoint, Req
break;
}
}
std::cout << "EventStreamSocketZmq::init ok" << std::endl;
}
void EventStreamSocketZmq::send(const std::string& data) {
......@@ -107,15 +119,17 @@ std::string EventStreamSocketZmq::receive(bool blocking) {
void EventStreamSocketZmq::cancel() {
if (m_cancelSocket.get() != nullptr) {
string data(message::Event::CANCEL);
zmq::message_t requestType(data.length());
zmq::message_t requestData(data.length());
memcpy(requestType.data(), message::Event::CANCEL, data.length());
memcpy(requestData.data(), data.c_str(), data.length());
m_cancelSocket->send(requestType, zmq::send_flags::sndmore);
m_cancelSocket->send(requestData, zmq::send_flags::none);
}
std::cout << "TODO EventStreamSocketZmq::cancel" << std::endl;
// if (m_cancelSocket.get() != nullptr) {
// string data(message::Event::CANCEL);
// zmq::message_t requestType(data.length());
// zmq::message_t requestData(data.length());
// memcpy(requestType.data(), message::Event::CANCEL, data.length());
// memcpy(requestData.data(), data.c_str(), data.length());
// m_cancelSocket->send(requestType, zmq::send_flags::sndmore);
// m_cancelSocket->send(requestData, zmq::send_flags::none);
// }
}
void EventStreamSocketZmq::close() {
......
......@@ -49,14 +49,20 @@ void OutputStreamSocketZmq::init(Context * context, const Endpoint& endpoint, Re
std::vector<std::string> topicsList;
// Get the topic id.
std::string topicId = StringId::from(message::Event::STREAM, m_name);
// std::string topicId = StringId::from(message::Event::STREAM, m_name);
topicsList.push_back(topicId);
topicsList.push_back(message::Event::CANCEL);
// topicsList.push_back(topicId);
// topicsList.push_back(message::Event::CANCEL);
for (std::vector<std::string>::const_iterator s = topicsList.begin(); s != topicsList.end(); ++s) {
m_socket->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
// for (std::vector<std::string>::const_iterator s = topicsList.begin(); s != topicsList.end(); ++s) {
// m_socket->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
// }
// Get the topic id.
std::string topicId = TopicId::from(message::Event::STREAM_value, m_name);
m_socket->setsockopt(ZMQ_SUBSCRIBE, topicId.c_str(), topicId.length());
m_socket->setsockopt(ZMQ_SUBSCRIBE, &message::Event::CANCEL, 4);
m_socket->connect(endpoint.toString().c_str());
m_socket->connect(cancelEndpoint.str().c_str());
......@@ -110,15 +116,17 @@ std::string OutputStreamSocketZmq::receive(bool blocking) {
void OutputStreamSocketZmq::cancel() {
if (m_cancelSocket.get() != nullptr) {
std::string data(message::Event::CANCEL);
zmq::message_t requestType(data.length());
zmq::message_t requestData(data.length());
memcpy(requestType.data(), message::Event::CANCEL, data.length());
memcpy(requestData.data(), data.c_str(), data.length());
m_cancelSocket->send(requestType, zmq::send_flags::sndmore);
m_cancelSocket->send(requestData, zmq::send_flags::none);
}
std::cout << "TODO OutputStreamSocketZmq::cancel" << std::endl;
// if (m_cancelSocket.get() != nullptr) {
// std::string data(message::Event::CANCEL);
// zmq::message_t requestType(data.length());
// zmq::message_t requestData(data.length());
// memcpy(requestType.data(), message::Event::CANCEL, data.length());
// memcpy(requestData.data(), data.c_str(), data.length());
// m_cancelSocket->send(requestType, zmq::send_flags::sndmore);
// m_cancelSocket->send(requestData, zmq::send_flags::none);
// }
}
void OutputStreamSocketZmq::close() {
......
......@@ -53,11 +53,11 @@ void SubscriberZmq::init(int appId, const Endpoint& appEndpoint, const Endpoint&
// Create a socket for publishing.
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
m_subscriber.reset(new zmq::socket_t(contextImpl->getContext(), zmq::socket_type::sub));
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::SYNC, std::string(message::Event::SYNC).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STREAM, std::string(message::Event::STREAM).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::ENDSTREAM_temp, std::string(message::Event::ENDSTREAM_temp).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::CANCEL, std::string(message::Event::CANCEL).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STATUS, std::string(message::Event::STATUS).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::SYNC.c_str(), std::string(message::Event::SYNC).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STREAM.c_str(), std::string(message::Event::STREAM).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::ENDSTREAM_temp.c_str(), std::string(message::Event::ENDSTREAM_temp).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::CANCEL.c_str(), std::string(message::Event::CANCEL).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STATUS.c_str(), std::string(message::Event::STATUS).length());
m_subscriber->connect(appEndpoint.withPort(m_publisherPort).toString());
......
......@@ -17,6 +17,8 @@ package fr.ill.ics.cameo.base.impl.zmq;
import java.util.Arrays;
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
......@@ -94,12 +96,12 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
public Event receive() {
String message = this.subscriberSocket.recvStr();
byte[] message = this.subscriberSocket.recv();
Event event = null;
// We can receive messages from the status publisher located in the server
// as well as messages from the cancel publisher located in the same process.
if (message.equals(Messages.Event.STATUS)) {
if (Arrays.equals(message, Messages.Event.STATUS)) {
byte[] statusMessage = this.subscriberSocket.recv();
......@@ -124,7 +126,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
throw new UnexpectedException("Cannot parse response");
}
}
else if (message.equals(Messages.Event.RESULT)) {
else if (Arrays.equals(message, Messages.Event.RESULT)) {
byte[] resultMessage = this.subscriberSocket.recv();
......@@ -144,7 +146,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
throw new UnexpectedException("Cannot parse response");
}
}
else if (message.equals(Messages.Event.KEYVALUE)) {
else if (Arrays.equals(message, Messages.Event.KEYVALUE)) {
byte[] keyValueMessage = this.subscriberSocket.recv();
......@@ -169,7 +171,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
throw new UnexpectedException("Cannot parse response");
}
}
else if (message.equals(Messages.Event.CANCEL)) {
else if (Arrays.equals(message, Messages.Event.CANCEL)) {
canceled = true;
return null;
}
......@@ -183,7 +185,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
public void cancel() {
cancelSocket.sendMore(Messages.Event.CANCEL);
cancelSocket.send(Messages.Event.CANCEL);
cancelSocket.send("cancel");
}
public void destroy() {
......
......@@ -17,6 +17,8 @@ package fr.ill.ics.cameo.base.impl.zmq;
import java.util.Arrays;
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
......@@ -32,7 +34,7 @@ import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.JSON.Parser;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
import fr.ill.ics.cameo.strings.StringId;
import fr.ill.ics.cameo.strings.TopicId;
public class OutputStreamSocketZmq implements OutputStreamSocketImpl {
......@@ -60,7 +62,7 @@ public class OutputStreamSocketZmq implements OutputStreamSocketImpl {
subscriber.connect(endpoint.toString());
// Subscribe to the topic.
String topicId = StringId.from(Messages.Event.STREAM, name);
byte[] topicId = TopicId.from(Messages.Event.STREAM, name);
subscriber.subscribe(topicId);
String cancelEndpoint = "inproc://cancel." + CancelIdGenerator.newId();
......@@ -103,12 +105,12 @@ public class OutputStreamSocketZmq implements OutputStreamSocketImpl {
public Application.Output receive() {
// Loop on recvStr() because in case of configuration multiple=yes, messages can come from different instances.
// Loop on recv() because in case of configuration multiple=yes, messages can come from different instances.
while (true) {
String messageType = this.subscriberSocket.recvStr();
byte[] messageType = this.subscriberSocket.recv();
// Cancel can only come from this instance.
if (messageType.equals(Messages.Event.CANCEL)) {
if (Arrays.equals(messageType, Messages.Event.CANCEL)) {
canceled = true;
return null;
}
......@@ -164,7 +166,7 @@ public class OutputStreamSocketZmq implements OutputStreamSocketImpl {
public void cancel() {
cancelSocket.sendMore(Messages.Event.CANCEL);
cancelSocket.send(Messages.Event.CANCEL);
cancelSocket.send("cancel");
}
public void destroy() {
......
......@@ -209,7 +209,7 @@ public class PublisherZmq implements PublisherImpl {
if (!ended) {
publisher.sendMore(Messages.Event.ENDSTREAM_temp);
publisher.send(Messages.Event.ENDSTREAM_temp);
publisher.send("endstream");
ended = true;
}
......@@ -240,7 +240,7 @@ public class PublisherZmq implements PublisherImpl {
// send a dummy SYNC message by the publisher socket
publisher.sendMore(Messages.Event.SYNC);
publisher.send(Messages.Event.SYNC);
publisher.send("sync");
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
......
......@@ -228,7 +228,7 @@ public class SubscriberZmq implements SubscriberImpl {
cancelPublisher.sendMore(Messages.Event.CANCEL);
cancelPublisher.send(Messages.Event.CANCEL);
cancelPublisher.send("cancel");
}
public static JSONObject createSubscribePublisherRequest() {
......
......@@ -112,8 +112,8 @@ public class Zmq {
return socket.connect(address);
}
public void subscribe(String topic) {
socket.subscribe(topic.getBytes());
public void subscribe(byte[] topic) {
socket.subscribe(topic);
}
public void send(String data) {
......
......@@ -114,8 +114,8 @@ public class Zmq {
return true;
}
public void subscribe(String topic) {
socket.subscribe(topic.getBytes());
public void subscribe(byte[] topic) {
socket.subscribe(topic);
}
public void send(String data) {
......
......@@ -50,13 +50,14 @@ public class Messages {
public static final long STREAM_END = 40;
public static class Event {
public static final String SYNC = "sync";
public static final String CANCEL = "cancel";
public static final String STREAM = "stream";
public static final String ENDSTREAM_temp = "endstream";
public static final String STATUS = "status";
public static final String RESULT = "result";
public static final String KEYVALUE = "keyvalue";
public static final byte[] SYNC = new byte[] { (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x01 };
public static final byte[] CANCEL = new byte[] { (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x02 };
public static final byte[] STREAM = new byte[] { (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x03 };
public static final byte[] ENDSTREAM_temp = new byte[] { (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x04 };
public static final byte[] STATUS = new byte[] { (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x05 };
public static final byte[] RESULT = new byte[] { (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x06 };
public static final byte[] KEYVALUE = new byte[] { (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x00, (byte)0x07 };
}
public static class ApplicationIdentity {
......
package fr.ill.ics.cameo.strings;
public class TopicId {
public static byte[] from(byte[] id, String name) {
byte[] nameBytes = name.getBytes();
byte[] result = new byte[id.length + nameBytes.length];
System.arraycopy(id, 0, result, 0, id.length);
System.arraycopy(nameBytes, 0, result, id.length, nameBytes.length);
return result;
}
}
......@@ -147,7 +147,7 @@ public class Manager extends ConfigLoader {
return streamPublishers.get(name);
}
public static void publishSynchronized(Zmq.Socket publisher, String topicId, byte[] data) {
public static void publishSynchronized(Zmq.Socket publisher, byte[] topicId, byte[] data) {
synchronized (publisher) {
publisher.sendMore(topicId);
......
......@@ -46,7 +46,7 @@ import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.server.Server.Version;
import fr.ill.ics.cameo.strings.ApplicationIdentity;
import fr.ill.ics.cameo.strings.Endpoint;
import fr.ill.ics.cameo.strings.StringId;
import fr.ill.ics.cameo.strings.TopicId;
/**
*
......@@ -88,7 +88,7 @@ public class RequestProcessor {
event.put(Messages.TYPE, Messages.SYNC_STREAM);
// Get the topic id.
String topicId = StringId.from(Messages.Event.STREAM, applicationName);
byte[] topicId = TopicId.from(Messages.Event.STREAM, applicationName);
// Synchronize the publisher as it is accessed by the stream threads.
Manager.publishSynchronized(publisher, topicId, Messages.serialize(event));
......
......@@ -29,7 +29,7 @@ import fr.ill.ics.cameo.manager.Application;
import fr.ill.ics.cameo.manager.Log;
import fr.ill.ics.cameo.manager.Manager;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.StringId;
import fr.ill.ics.cameo.strings.TopicId;
/**
* Class getting the stream from the process input stream.
......@@ -44,7 +44,7 @@ public class StreamApplicationThread extends ApplicationThread {
private boolean send = false;
private boolean eol;
private Zmq.Socket publisher;
private String topicId;
private byte[] topicId;
private FileOutputStream fileOutputStream;
/**
......@@ -62,7 +62,7 @@ public class StreamApplicationThread extends ApplicationThread {
// Memorize the string id.
// The topic name starts with the "stream" string rather than the application name.
// Indeed, the ZeroMQ filter applies on the prefix, so that "result" would conflict with an application name starting with "result" e.g. "resultcpp".
topicId = StringId.from(Messages.Event.STREAM, application.getName());
topicId = TopicId.from(Messages.Event.STREAM, application.getName());
}
private void sendMessage(String line, boolean endOfLine) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment