Commit 91ba2c06 authored by legoc's avatar legoc
Browse files

(split) Replaced protobuf by JSON in C++ API

parent 821e574d
1.0.0
-----
* Replaced protobuf by JSON.
0.2.2
-----
......
SUBDIRS = src
PROTO_PATH=$(top_srcdir)/src/proto
proto:
protoc -I=$(PROTO_PATH) --cpp_out=$(PROTO_PATH) $(PROTO_PATH)/Messages.proto
......@@ -26,26 +26,26 @@ libcameo_la_SOURCES = \
cameo/StarterServerException.cpp \
cameo/ResponderCreationException.cpp \
cameo/RequesterCreationException.cpp \
cameo/Response.cpp \
cameo/impl/StreamSocketImpl.cpp \
cameo/impl/RequestSocketImpl.cpp \
cameo/impl/SocketWaitingImpl.cpp \
cameo/impl/GenericWaitingImpl.cpp \
cameo/impl/WaitingImplSet.cpp \
cameo/impl/CancelIdGenerator.cpp \
cameo/ConnectionChecker.cpp \
cameo/EventStreamSocket.cpp \
cameo/OutputStreamSocket.cpp \
cameo/Response.cpp \
cameo/impl/HandlerImpl.cpp \
cameo/impl/ServicesImpl.cpp \
cameo/impl/PublisherImpl.cpp \
cameo/impl/SubscriberImpl.cpp \
cameo/impl/RequestImpl.cpp \
cameo/impl/ResponderImpl.cpp \
cameo/impl/RequesterImpl.cpp \
cameo/ConnectionChecker.cpp \
cameo/EventStreamSocket.cpp \
cameo/OutputStreamSocket.cpp \
cameo/EventThread.cpp \
cameo/Services.cpp \
cameo/impl/ServicesImpl.cpp \
cameo/Server.cpp \
cameo/impl/HandlerImpl.cpp \
cameo/Application.cpp \
cameo/impl/StreamSocketImpl.h \
cameo/impl/RequestSocketImpl.h \
......@@ -60,9 +60,7 @@ libcameo_la_SOURCES = \
cameo/impl/SubscriberImpl.h \
cameo/impl/RequestImpl.h \
cameo/impl/ResponderImpl.h \
cameo/impl/RequesterImpl.h \
proto/Messages.pb.cc \
proto/Messages.proto
cameo/impl/RequesterImpl.h
# header files that are installed
nobase_include_HEADERS = \
......@@ -70,7 +68,6 @@ nobase_include_HEADERS = \
cameo/Serializer.h \
cameo/TimeCondition.h \
cameo/Application.h \
cameo/ProtoType.h \
cameo/ConnectionTimeout.h \
cameo/Event.h \
cameo/EventThread.h \
......
......@@ -33,7 +33,8 @@
#include "impl/HandlerImpl.h"
#include "impl/StreamSocketImpl.h"
#include "impl/RequestSocketImpl.h"
#include "ProtoType.h"
#include "message/JSON.h"
#include "message/Message.h"
#include "Server.h"
#include "StarterServerException.h"
#include "StatusEvent.h"
......@@ -267,30 +268,30 @@ void This::cancelWaitings() {
int This::initUnmanagedApplication() {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_STARTEDUNMANAGED), m_impl->createStartedUnmanagedRequest(m_name));
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartedUnmanagedRequest(m_name));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
return requestResponse.value();
return response[message::RequestResponse::VALUE].GetInt();
}
void This::terminateUnmanagedApplication() {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_TERMINATEDUNMANAGED), m_impl->createTerminatedUnmanagedRequest(m_id));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
m_requestSocket->request(m_impl->createTerminatedUnmanagedRequest(m_id));
}
bool This::setRunning() {
unique_ptr<zmq::message_t> reply = m_instance.m_requestSocket->request(m_instance.m_impl->createRequestType(PROTO_SETSTATUS), m_instance.m_impl->createSetStatusRequest(m_instance.m_id, RUNNING));
unique_ptr<zmq::message_t> reply = m_instance.m_requestSocket->request(m_instance.m_impl->createSetStatusRequest(m_instance.m_id, RUNNING));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
if (requestResponse.value() == -1) {
int value = response[message::RequestResponse::VALUE].GetInt();
if (value == -1) {
return false;
}
......@@ -299,10 +300,7 @@ bool This::setRunning() {
void This::setBinaryResult(const std::string& data) {
unique_ptr<zmq::message_t> reply = m_instance.m_requestSocket->request(m_instance.m_impl->createRequestType(PROTO_SETRESULT), m_instance.m_impl->createSetResultRequest(m_instance.m_id, data));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
m_instance.m_requestSocket->request(m_instance.m_impl->createSetResultRequest(m_instance.m_id), data);
}
void This::setResult(const std::string& data) {
......@@ -312,63 +310,40 @@ void This::setResult(const std::string& data) {
setBinaryResult(resultData);
}
void This::setResult(const int32_t* data, std::size_t size) {
string resultData;
serialize(data, size, resultData);
setBinaryResult(resultData);
}
void This::setResult(const int64_t* data, std::size_t size) {
string resultData;
serialize(data, size, resultData);
setBinaryResult(resultData);
}
void This::setResult(const float* data, std::size_t size) {
string resultData;
serialize(data, size, resultData);
setBinaryResult(resultData);
}
void This::setResult(const double* data, std::size_t size) {
string resultData;
serialize(data, size, resultData);
setBinaryResult(resultData);
}
State This::getState(int id) const {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_GETSTATUS), m_impl->createGetStatusRequest(id));
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetStatusRequest(id));
proto::StatusEvent protoStatus;
protoStatus.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object event;
json::parse(event, reply.get());
return protoStatus.applicationstate();
return event[message::StatusEvent::APPLICATION_STATE].GetInt();
}
bool This::destroyPublisher(const std::string& name) const {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_TERMINATEPUBLISHER), m_impl->createTerminatePublisherRequest(m_id, name));
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createTerminatePublisherRequest(m_id, name));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
int value = response[message::RequestResponse::VALUE].GetInt();
int value = requestResponse.value();
return (value != -1);
}
bool This::removePort(const std::string& name) const {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_REMOVEPORT), m_impl->createRemovePortRequest(m_id, name));
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRemovePortRequest(m_id, name));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int value = response[message::RequestResponse::VALUE].GetInt();
int value = requestResponse.value();
return (value != -1);
}
......@@ -683,42 +658,6 @@ bool Instance::getResult(std::string& result) {
return m_hasResult;
}
bool Instance::getResult(std::vector<int32_t>& result) {
string bytes;
getBinaryResult(bytes);
parse(bytes, result);
return m_hasResult;
}
bool Instance::getResult(std::vector<int64_t>& result) {
string bytes;
getBinaryResult(bytes);
parse(bytes, result);
return m_hasResult;
}
bool Instance::getResult(std::vector<float>& result) {
string bytes;
getBinaryResult(bytes);
parse(bytes, result);
return m_hasResult;
}
bool Instance::getResult(std::vector<double>& result) {
string bytes;
getBinaryResult(bytes);
parse(bytes, result);
return m_hasResult;
}
std::shared_ptr<OutputStreamSocket> Instance::getOutputStreamSocket() {
return m_outputStreamSocket;
}
......@@ -773,16 +712,17 @@ Publisher::~Publisher() {
std::unique_ptr<Publisher> Publisher::create(const std::string& name, int numberOfSubscribers) {
unique_ptr<zmq::message_t> reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestType(PROTO_CREATEPUBLISHER), This::m_instance.m_impl->createCreatePublisherRequest(This::m_instance.m_id, name, numberOfSubscribers));
unique_ptr<zmq::message_t> reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createCreatePublisherRequest(This::m_instance.m_id, name, numberOfSubscribers));
proto::PublisherResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int publisherPort = requestResponse.publisherport();
int publisherPort = response[message::PublisherResponse::PUBLISHER_PORT].GetInt();
if (publisherPort == -1) {
throw PublisherCreationException(requestResponse.message());
throw PublisherCreationException(response[message::PublisherResponse::MESSAGE].GetString());
}
int synchronizerPort = requestResponse.synchronizerport();
int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt();;
return unique_ptr<Publisher>(new Publisher(&This::m_instance, publisherPort, synchronizerPort, name, numberOfSubscribers));
}
......@@ -820,22 +760,6 @@ void Publisher::send(const std::string& data) const {
m_impl->send(data);
}
void Publisher::send(const int32_t* data, std::size_t size) const {
m_impl->send(data, size);
}
void Publisher::send(const int64_t* data, std::size_t size) const {
m_impl->send(data, size);
}
void Publisher::send(const float* data, std::size_t size) const {
m_impl->send(data, size);
}
void Publisher::send(const double* data, std::size_t size) const {
m_impl->send(data, size);
}
void Publisher::sendTwoBinaryParts(const std::string& data1, const std::string& data2) const {
m_impl->sendTwoBinaryParts(data1, data2);
}
......@@ -934,22 +858,6 @@ bool Subscriber::receive(std::string& data) const {
return m_impl->receive(data);
}
bool Subscriber::receive(std::vector<int32_t>& data) const {
return m_impl->receive(data);
}
bool Subscriber::receive(std::vector<int64_t>& data) const {
return m_impl->receive(data);
}
bool Subscriber::receive(std::vector<float>& data) const {
return m_impl->receive(data);
}
bool Subscriber::receive(std::vector<double>& data) const {
return m_impl->receive(data);
}
bool Subscriber::receiveTwoBinaryParts(std::string& data1, std::string& data2) const {
return m_impl->receiveTwoBinaryParts(data1, data2);
}
......@@ -1033,14 +941,15 @@ std::unique_ptr<Responder> Responder::create(const std::string& name) {
string portName = ResponderImpl::RESPONDER_PREFIX + name;
unique_ptr<zmq::message_t> reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestType(PROTO_REQUESTPORT), This::m_instance.m_impl->createRequestPortRequest(This::m_instance.m_id, portName));
unique_ptr<zmq::message_t> reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestPortRequest(This::m_instance.m_id, portName));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int responderPort = requestResponse.value();
int responderPort = response[message::RequestResponse::VALUE].GetInt();
if (responderPort == -1) {
throw ResponderCreationException(requestResponse.message());
throw ResponderCreationException(response[message::RequestResponse::MESSAGE].GetString());
}
return unique_ptr<Responder>(new Responder(&This::m_instance, responderPort, name));
......@@ -1093,41 +1002,40 @@ std::unique_ptr<Requester> Requester::create(Instance & instance, const std::str
int requesterId = RequesterImpl::newRequesterId();
string requesterPortName = RequesterImpl::getRequesterPortName(name, responderId, requesterId);
string requestTypePart = This::m_instance.m_impl->createRequestType(PROTO_CONNECTPORT);
string requestDataPart = This::m_instance.m_impl->createConnectPortRequest(responderId, responderPortName);
string request = This::m_instance.m_impl->createConnectPortRequest(responderId, responderPortName);
unique_ptr<zmq::message_t> reply = instanceRequestSocket->request(request);
unique_ptr<zmq::message_t> reply = instanceRequestSocket->request(requestTypePart, requestDataPart);
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
reply.reset();
int responderPort = requestResponse.value();
int responderPort = response[message::RequestResponse::VALUE].GetInt();
if (responderPort == -1) {
// Wait for the responder port.
instance.waitFor(0, responderPortName);
// Retry to connect.
reply = instanceRequestSocket->request(requestTypePart, requestDataPart);
reply = instanceRequestSocket->request(request);
json::parse(response, reply.get());
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
responderPort = requestResponse.value();
responderPort = response[message::RequestResponse::VALUE].GetInt();
if (responderPort == -1) {
throw RequesterCreationException(requestResponse.message());
throw RequesterCreationException(response[message::RequestResponse::MESSAGE].GetString());
}
reply.reset();
}
// Request a requester port.
reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestType(PROTO_REQUESTPORT), This::m_instance.m_impl->createRequestPortRequest(This::m_instance.m_id, requesterPortName));
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestPortRequest(This::m_instance.m_id, requesterPortName));
json::parse(response, reply.get());
int requesterPort = requestResponse.value();
int requesterPort = response[message::RequestResponse::VALUE].GetInt();
if (requesterPort == -1) {
throw RequesterCreationException(requestResponse.message());
throw RequesterCreationException(response[message::RequestResponse::MESSAGE].GetString());
}
return unique_ptr<Requester>(new Requester(&This::m_instance, responderUrl, requesterPort, responderPort, name, responderId, requesterId));
......
......@@ -137,10 +137,6 @@ public:
*/
static void setBinaryResult(const std::string& data);
static void setResult(const std::string& data);
static void setResult(const int32_t* data, std::size_t size);
static void setResult(const int64_t* data, std::size_t size);
static void setResult(const float* data, std::size_t size);
static void setResult(const double* data, std::size_t size);
/**
* Connects to the starter application, i.e. the application which started this application.
......@@ -229,10 +225,6 @@ public:
bool getBinaryResult(std::string& result);
bool getResult(std::string& result);
bool getResult(std::vector<int32_t>& result);
bool getResult(std::vector<int64_t>& result);
bool getResult(std::vector<float>& result);
bool getResult(std::vector<double>& result);
std::shared_ptr<OutputStreamSocket> getOutputStreamSocket();
......@@ -310,10 +302,6 @@ public:
void sendBinary(const std::string& data) const;
void send(const std::string& data) const;
void send(const int32_t* data, std::size_t size) const;
void send(const int64_t* data, std::size_t size) const;
void send(const float* data, std::size_t size) const;
void send(const double* data, std::size_t size) const;
void sendTwoBinaryParts(const std::string& data1, const std::string& data2) const;
void sendEnd() const;
......@@ -365,10 +353,6 @@ public:
*/
bool receiveBinary(std::string& data) const;
bool receive(std::string& data) const;
bool receive(std::vector<int32_t>& data) const;
bool receive(std::vector<int64_t>& data) const;
bool receive(std::vector<float>& data) const;
bool receive(std::vector<double>& data) const;
bool receiveTwoBinaryParts(std::string& data1, std::string& data2) const;
void cancel();
......
......@@ -21,8 +21,9 @@
#include "PublisherEvent.h"
#include "ResultEvent.h"
#include "StatusEvent.h"
#include "../proto/Messages.pb.h"
#include "impl/StreamSocketImpl.h"
#include "message/JSON.h"
#include "message/Message.h"
using namespace std;
......@@ -49,39 +50,63 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
message = m_impl->receive();
proto::StatusEvent protoStatus;
protoStatus.ParseFromArray(message->data(), message->size());
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
return unique_ptr<Event>(new StatusEvent(protoStatus.id(), protoStatus.name(), protoStatus.applicationstate(), protoStatus.pastapplicationstates()));
int id = event[message::StatusEvent::ID].GetInt();
string name = event[message::StatusEvent::NAME].GetString();
application::State state = event[message::StatusEvent::APPLICATION_STATE].GetInt();
application::State pastStates = event[message::StatusEvent::PAST_APPLICATION_STATES].GetInt();
} else if (response == "RESULT") {
return unique_ptr<Event>(new StatusEvent(id, name, state, pastStates));
}
else if (response == "RESULT") {
message = m_impl->receive();
proto::ResultEvent protoResult;
protoResult.ParseFromArray(message->data(), message->size());
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
int id = event[message::ResultEvent::ID].GetInt();
string name = event[message::ResultEvent::NAME].GetString();
return unique_ptr<Event>(new ResultEvent(protoResult.id(), protoResult.name(), protoResult.data()));
// Get the data in the next part.
message = m_impl->receive();
string data(message->data<char>(), message->size());
} else if (response == "PUBLISHER") {
return unique_ptr<Event>(new ResultEvent(id, name, data));
}
else if (response == "PUBLISHER") {
message = m_impl->receive();
proto::PublisherEvent protoPublisher;
protoPublisher.ParseFromArray(message->data(), message->size());
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
return unique_ptr<Event>(new PublisherEvent(protoPublisher.id(), protoPublisher.name(), protoPublisher.publishername()));
int id = event[message::PublisherEvent::ID].GetInt();
string name = event[message::PublisherEvent::NAME].GetString();
string publisherName = event[message::PublisherEvent::PUBLISHER_NAME].GetString();
} else if (response == "PORT") {
return unique_ptr<Event>(new PublisherEvent(id, name, publisherName));
}
else if (response == "PORT") {
message = m_impl->receive();
proto::PortEvent protoPort;
protoPort.ParseFromArray(message->data(), message->size());
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
return unique_ptr<Event>(new PortEvent(protoPort.id(), protoPort.name(), protoPort.portname()));
int id = event[message::PortEvent::ID].GetInt();
string name = event[message::PortEvent::NAME].GetString();
string portName = event[message::PortEvent::PORT_NAME].GetString();
} else if (response == "CANCEL") {
return unique_ptr<Event>(new PortEvent(id, name, portName));
}
else if (response == "CANCEL") {
message = m_impl->receive();
......
......@@ -18,9 +18,10 @@
#include "impl/SocketWaitingImpl.h"
#include "impl/ServicesImpl.h"
#include "../proto/Messages.pb.h"
#include <iostream>
#include "impl/StreamSocketImpl.h"
#include "message/JSON.h"
#include "message/Message.h"
#include <iostream>
using namespace std;
......@@ -51,7 +52,7 @@ bool OutputStreamSocket::receive(Output& output) {
unique_ptr<zmq::message_t> message(m_impl->receive());
string response(static_cast<char*>(message->data()), message->size());
string response(message->data<char>(), message->size());
if (response == ServicesImpl::STREAM) {
}
......@@ -66,11 +67,15 @@ bool OutputStreamSocket::receive(Output& output) {
message = m_impl->receive();
proto::ApplicationStream protoStream;
protoStream.ParseFromArray(message->data(), message->size());
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
int id = event[message::ApplicationStream::ID].GetInt();
string line = event[message::ApplicationStream::MESSAGE].GetString();
output.m_id = protoStream.id();
output.m_message = protoStream.message();
output.m_id = id;
output.m_message = line;
return true;
}
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_PROTOTYPE_H_
#define CAMEO_PROTOTYPE_H_
namespace cameo {
enum ProtoType {