Commit 1bdc12c7 authored by legoc's avatar legoc
Browse files

(split2) Merged protobuf-to-json branch

parents e515eeaa ba2df304
1.0.0
-----
* Replaced protobuf by JSON.
0.3.3
-----
......
SUBDIRS = src
PROTO_PATH=$(top_srcdir)/src/proto
proto:
protoc -I=$(PROTO_PATH) --cpp_out=$(PROTO_PATH) $(PROTO_PATH)/Messages.proto
......@@ -4,8 +4,8 @@
#
# -----------------------------------------------------------------------------
AC_INIT(cameo-api-cpp, 0.3.3)
LIBRARY_VERSION=0:3:3
AC_INIT(cameo-api-cpp, 1.0.0)
LIBRARY_VERSION=1:0:0
AC_CONFIG_AUX_DIR(config)
AC_CONFIG_SRCDIR(src/cameo/Application.cpp)
......@@ -25,16 +25,10 @@ AC_SUBST(LIBRARY_VERSION)
AX_CXX_COMPILE_STDCXX_11
AC_ZMQ
AC_PROTOBUF_LITE
CAMEO_CXXFLAGS="$ZMQ_CFLAGS \
$PROTOBUF_CFLAGS"
CAMEO_LDFLAGS="$ZMQ_LDFLAGS \
$PROTOBUF_LDFLAGS"
CAMEO_LIBS="$ZMQ_LIB \
$PROTOBUF_LIB"
CAMEO_CXXFLAGS="$ZMQ_CFLAGS"
CAMEO_LDFLAGS="$ZMQ_LDFLAGS"
CAMEO_LIBS="$ZMQ_LIB"
AC_SUBST(CAMEO_CXXFLAGS)
AC_SUBST(CAMEO_LDFLAGS)
......
###############################################################################
# Version 29/04/2019
# Version 18/03/2020
# defines CAMEO_CFLAGS, CAMEO_LDFLAGS, CAMEO_LIBS
#
AC_DEFUN([AC_CAMEO],
......@@ -10,7 +10,6 @@ AC_DEFUN([AC_CAMEO],
[with_cameo=yes])
AC_LIBZMQ
AC_LIBPROTOBUF_LITE
CAMEO_CFLAGS=
CAMEO_LIBS=
......@@ -18,7 +17,7 @@ AC_DEFUN([AC_CAMEO],
if test $with_cameo != yes; then
cameo_possible_path="$with_cameo"
else
cameo_possible_path="/usr/local /usr /opt /var"
cameo_possible_path="/usr /opt /var /usr/local"
fi
AC_MSG_CHECKING([for cameo headers])
cameo_save_CXXFLAGS="$CXXFLAGS"
......@@ -44,7 +43,7 @@ AC_DEFUN([AC_CAMEO],
CXXFLAGS="$CXXFLAGS $CAMEO_CFLAGS"
# search for library
LIBS="$LIBS $CAMEO_LIBS $LIBZMQ_LDFLAGS $LIBPROTOBUF_LDFLAGS $LIBZMQ_LIB $LIBPROTOBUF_LIB -lcameo"
LIBS="$LIBS $CAMEO_LIBS $LIBZMQ_LDFLAGS $LIBZMQ_LIB -lcameo"
AC_LINK_IFELSE([AC_LANG_PROGRAM([[]],
[[]])],
......@@ -56,8 +55,8 @@ AC_DEFUN([AC_CAMEO],
HAVE_CAMEO=1
LIBS="$cameo_save_LIBS"
CAMEO_LDFLAGS="$LIBZMQ_LDFLAGS $LIBPROTOBUF_LDFLAGS $CAMEO_LIBS"
CAMEO_LIBS="-lcameo $LIBZMQ_LIB $LIBPROTOBUF_LIB -pthread"
CAMEO_LDFLAGS="$LIBZMQ_LDFLAGS $CAMEO_LIBS"
CAMEO_LIBS="-lcameo $LIBZMQ_LIB -pthread"
fi
if test $cameo_found = yes; then
......
###############################################################################
# Version 01/07/2015
# defines LIBPROTOBUF_LDFLAGS, LIBPROTOBUF_LIB
#
AC_DEFUN([AC_LIBPROTOBUF_LITE],
[
AC_ARG_WITH([protobuf],
AS_HELP_STRING([--with-protobuf=PREFIX],[Specify protobuf library location]),
[],
[with_protobuf=yes])
PROTOBUF_LIBS=
protobuf_save_LIBS="$LIBS"
if test $with_protobuf != no; then
if test $with_protobuf != yes; then
protobuf_possible_path="$with_protobuf"
else
protobuf_possible_path="/usr/local /usr /opt /var"
fi
AC_MSG_CHECKING([for protobuf -lprotobuf-lite])
protobuf_found=no
for protobuf_path_tmp in $protobuf_possible_path ; do
LIBS="$LIBS $PROTOBUF_LIBS -lprotobuf-lite"
AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[]],[[]])],
[PROTOBUF_LIBS="-L$protobuf_path_tmp/lib"
protobuf_found=yes]
[])
if test $protobuf_found = yes; then
break;
fi
done
if test $protobuf_found = yes; then
LIBPROTOBUF_LDFLAGS="$PROTOBUF_LIBS"
LIBPROTOBUF_LIB="-lprotobuf-lite"
AC_MSG_RESULT(yes)
AC_SUBST(LIBPROTOBUF_LDFLAGS)
AC_SUBST(LIBPROTOBUF_LIB)
else
AC_MSG_RESULT(no)
fi
fi
LIBS="$protobuf_save_LIBS"
])
###############################################################################
# Version 01/07/2015
# Version 18/03/2020
# defines LIBZMQ_LDFLAGS, LIBZMQ_LIB
#
AC_DEFUN([AC_LIBZMQ],
......@@ -15,7 +15,7 @@ AC_DEFUN([AC_LIBZMQ],
if test $with_zmq != yes; then
zeromq_possible_path="$with_zmq"
else
zeromq_possible_path="/usr/local /usr /opt /var"
zeromq_possible_path="/usr /opt /var /usr/local"
fi
AC_MSG_CHECKING([for zeromq -lzmq])
zeromq_found=no
......
###############################################################################
# Version 01/07/2015
# defines PROTOBUF_CFLAGS, PROTOBUF_LDFLAGS, PROTOBUF_LIB
#
AC_DEFUN([AC_PROTOBUF_LITE],
[ AC_ARG_WITH([protobuf],
AS_HELP_STRING([--with-protobuf=PREFIX],[Specify protobuf library location]),
[],
[with_protobuf=yes])
PROTOBUF_CFLAGS=
PROTOBUF_LIBS=
if test $with_protobuf != no; then
if test $with_protobuf != yes; then
protobuf_possible_path="$with_protobuf"
else
protobuf_possible_path="/usr/local /usr /opt /var"
fi
AC_MSG_CHECKING([for protobuf headers])
protobuf_save_CXXFLAGS="$CXXFLAGS"
protobuf_found=no
for protobuf_path_tmp in $protobuf_possible_path ; do
# test include
CXXFLAGS="$CXXFLAGS -I$protobuf_path_tmp/include"
AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[#include <google/protobuf/descriptor.h>]],[[]])],
[PROTOBUF_CFLAGS="-I$protobuf_path_tmp/include"
PROTOBUF_LIBS="-L$protobuf_path_tmp/lib"
protobuf_found=yes]
[])
CXXFLAGS="$protobuf_save_CXXFLAGS"
if test $protobuf_found = yes; then
break;
fi
done
AC_MSG_RESULT($protobuf_found)
if test $protobuf_found = yes; then
AC_MSG_CHECKING([for protobuf -lprotobuf-lite])
protobuf_save_LIBS="$LIBS"
CXXFLAGS="$CXXFLAGS $PROTOBUF_CFLAGS"
# search for library
LIBS="$LIBS $PROTOBUF_LIBS -lprotobuf-lite"
AC_LINK_IFELSE([AC_LANG_PROGRAM([[#include <google/protobuf/descriptor.h>]],
[[]])],
[ protobuf_found=yes],
[ protobuf_found=no])
CXXFLAGS="$protobuf_save_CXXFLAGS"
LIBS="$protobuf_save_LIBS"
if test $protobuf_found = yes; then
HAVE_PROTOBUF=1
LIBS="$protobuf_save_LIBS"
PROTOBUF_LDFLAGS="$PROTOBUF_LIBS"
PROTOBUF_LIB="-lprotobuf-lite"
break;
fi
if test $protobuf_found = yes; then
AC_MSG_RESULT(yes)
AC_SUBST(PROTOBUF_CFLAGS)
AC_SUBST(PROTOBUF_LDFLAGS)
AC_SUBST(PROTOBUF_LIB)
else
AC_MSG_RESULT(no)
fi
fi
fi
])
......@@ -6,6 +6,7 @@ lib_LTLIBRARIES = libcameo.la
# header files that must be in the dist package but not installed are sources
libcameo_la_SOURCES = \
cameo/message/JSON.cpp \
cameo/Serializer.cpp \
cameo/TimeCondition.cpp \
cameo/Event.cpp \
......@@ -25,26 +26,26 @@ libcameo_la_SOURCES = \
cameo/StarterServerException.cpp \
cameo/ResponderCreationException.cpp \
cameo/RequesterCreationException.cpp \
cameo/Response.cpp \
cameo/impl/StreamSocketImpl.cpp \
cameo/impl/RequestSocketImpl.cpp \
cameo/impl/SocketWaitingImpl.cpp \
cameo/impl/GenericWaitingImpl.cpp \
cameo/impl/WaitingImplSet.cpp \
cameo/impl/CancelIdGenerator.cpp \
cameo/ConnectionChecker.cpp \
cameo/EventStreamSocket.cpp \
cameo/OutputStreamSocket.cpp \
cameo/Response.cpp \
cameo/impl/HandlerImpl.cpp \
cameo/impl/ServicesImpl.cpp \
cameo/impl/PublisherImpl.cpp \
cameo/impl/SubscriberImpl.cpp \
cameo/impl/RequestImpl.cpp \
cameo/impl/ResponderImpl.cpp \
cameo/impl/RequesterImpl.cpp \
cameo/ConnectionChecker.cpp \
cameo/EventStreamSocket.cpp \
cameo/OutputStreamSocket.cpp \
cameo/EventThread.cpp \
cameo/Services.cpp \
cameo/impl/ServicesImpl.cpp \
cameo/Server.cpp \
cameo/impl/HandlerImpl.cpp \
cameo/Application.cpp \
cameo/impl/StreamSocketImpl.h \
cameo/impl/RequestSocketImpl.h \
......@@ -59,16 +60,14 @@ libcameo_la_SOURCES = \
cameo/impl/SubscriberImpl.h \
cameo/impl/RequestImpl.h \
cameo/impl/ResponderImpl.h \
cameo/impl/RequesterImpl.h \
proto/Messages.pb.cc \
proto/Messages.proto
cameo/impl/RequesterImpl.h
# header files that are installed
nobase_include_HEADERS = \
cameo/message/JSON.h \
cameo/Serializer.h \
cameo/TimeCondition.h \
cameo/Application.h \
cameo/ProtoType.h \
cameo/ConnectionTimeout.h \
cameo/Event.h \
cameo/EventThread.h \
......
......@@ -33,7 +33,8 @@
#include "impl/HandlerImpl.h"
#include "impl/StreamSocketImpl.h"
#include "impl/RequestSocketImpl.h"
#include "ProtoType.h"
#include "message/JSON.h"
#include "message/Message.h"
#include "Server.h"
#include "StarterServerException.h"
#include "StatusEvent.h"
......@@ -267,30 +268,30 @@ void This::cancelWaitings() {
int This::initUnmanagedApplication() {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_STARTEDUNMANAGED), m_impl->createStartedUnmanagedRequest(m_name));
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartedUnmanagedRequest(m_name));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
return requestResponse.value();
return response[message::RequestResponse::VALUE].GetInt();
}
void This::terminateUnmanagedApplication() {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_TERMINATEDUNMANAGED), m_impl->createTerminatedUnmanagedRequest(m_id));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
m_requestSocket->request(m_impl->createTerminatedUnmanagedRequest(m_id));
}
bool This::setRunning() {
unique_ptr<zmq::message_t> reply = m_instance.m_requestSocket->request(m_instance.m_impl->createRequestType(PROTO_SETSTATUS), m_instance.m_impl->createSetStatusRequest(m_instance.m_id, RUNNING));
unique_ptr<zmq::message_t> reply = m_instance.m_requestSocket->request(m_instance.m_impl->createSetStatusRequest(m_instance.m_id, RUNNING));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
if (requestResponse.value() == -1) {
int value = response[message::RequestResponse::VALUE].GetInt();
if (value == -1) {
return false;
}
......@@ -299,10 +300,7 @@ bool This::setRunning() {
void This::setBinaryResult(const std::string& data) {
unique_ptr<zmq::message_t> reply = m_instance.m_requestSocket->request(m_instance.m_impl->createRequestType(PROTO_SETRESULT), m_instance.m_impl->createSetResultRequest(m_instance.m_id, data));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
m_instance.m_requestSocket->request(m_instance.m_impl->createSetResultRequest(m_instance.m_id), data);
}
void This::setResult(const std::string& data) {
......@@ -312,63 +310,40 @@ void This::setResult(const std::string& data) {
setBinaryResult(resultData);
}
void This::setResult(const int32_t* data, std::size_t size) {
string resultData;
serialize(data, size, resultData);
setBinaryResult(resultData);
}
void This::setResult(const int64_t* data, std::size_t size) {
string resultData;
serialize(data, size, resultData);
setBinaryResult(resultData);
}
void This::setResult(const float* data, std::size_t size) {
string resultData;
serialize(data, size, resultData);
setBinaryResult(resultData);
}
void This::setResult(const double* data, std::size_t size) {
string resultData;
serialize(data, size, resultData);
setBinaryResult(resultData);
}
State This::getState(int id) const {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_GETSTATUS), m_impl->createGetStatusRequest(id));
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetStatusRequest(id));
proto::StatusEvent protoStatus;
protoStatus.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object event;
json::parse(event, reply.get());
return protoStatus.applicationstate();
return event[message::StatusEvent::APPLICATION_STATE].GetInt();
}
bool This::destroyPublisher(const std::string& name) const {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_TERMINATEPUBLISHER), m_impl->createTerminatePublisherRequest(m_id, name));
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createTerminatePublisherRequest(m_id, name));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
int value = response[message::RequestResponse::VALUE].GetInt();
int value = requestResponse.value();
return (value != -1);
}
bool This::removePort(const std::string& name) const {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_REMOVEPORT), m_impl->createRemovePortRequest(m_id, name));
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRemovePortRequest(m_id, name));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int value = response[message::RequestResponse::VALUE].GetInt();
int value = requestResponse.value();
return (value != -1);
}
......@@ -683,42 +658,6 @@ bool Instance::getResult(std::string& result) {
return m_hasResult;
}
bool Instance::getResult(std::vector<int32_t>& result) {
string bytes;
getBinaryResult(bytes);
parse(bytes, result);
return m_hasResult;
}
bool Instance::getResult(std::vector<int64_t>& result) {
string bytes;
getBinaryResult(bytes);
parse(bytes, result);
return m_hasResult;
}
bool Instance::getResult(std::vector<float>& result) {
string bytes;
getBinaryResult(bytes);
parse(bytes, result);
return m_hasResult;
}
bool Instance::getResult(std::vector<double>& result) {
string bytes;
getBinaryResult(bytes);
parse(bytes, result);
return m_hasResult;
}
std::shared_ptr<OutputStreamSocket> Instance::getOutputStreamSocket() {
return m_outputStreamSocket;
}
......@@ -773,16 +712,17 @@ Publisher::~Publisher() {
std::unique_ptr<Publisher> Publisher::create(const std::string& name, int numberOfSubscribers) {
unique_ptr<zmq::message_t> reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestType(PROTO_CREATEPUBLISHER), This::m_instance.m_impl->createCreatePublisherRequest(This::m_instance.m_id, name, numberOfSubscribers));
unique_ptr<zmq::message_t> reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createCreatePublisherRequest(This::m_instance.m_id, name, numberOfSubscribers));
proto::PublisherResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int publisherPort = requestResponse.publisherport();
int publisherPort = response[message::PublisherResponse::PUBLISHER_PORT].GetInt();
if (publisherPort == -1) {
throw PublisherCreationException(requestResponse.message());
throw PublisherCreationException(response[message::PublisherResponse::MESSAGE].GetString());
}
int synchronizerPort = requestResponse.synchronizerport();
int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt();;
return unique_ptr<Publisher>(new Publisher(&This::m_instance, publisherPort, synchronizerPort, name, numberOfSubscribers));
}
......@@ -820,22 +760,6 @@ void Publisher::send(const std::string& data) const {
m_impl->send(data);
}
void Publisher::send(const int32_t* data, std::size_t size) const {
m_impl->send(data, size);
}
void Publisher::send(const int64_t* data, std::size_t size) const {
m_impl->send(data, size);
}
void Publisher::send(const float* data, std::size_t size) const {
m_impl->send(data, size);
}
void Publisher::send(const double* data, std::size_t size) const {
m_impl->send(data, size);
}
void Publisher::sendTwoBinaryParts(const std::string& data1, const std::string& data2) const {
m_impl->sendTwoBinaryParts(data1, data2);
}
......@@ -934,22 +858,6 @@ bool Subscriber::receive(std::string& data) const {
return m_impl->receive(data);
}
bool Subscriber::receive(std::vector<int32_t>& data) const {
return m_impl->receive(data);
}
bool Subscriber::receive(std::vector<int64_t>& data) const {
return m_impl->receive(data);
}
bool Subscriber::receive(std::vector<float>& data) const {
return m_impl->receive(data);
}
bool Subscriber::receive(std::vector<double>& data) const {
return m_impl->receive(data);
}
bool Subscriber::receiveTwoBinaryParts(std::string& data1, std::string& data2) const {
return m_impl->receiveTwoBinaryParts(data1, data2);
}
......@@ -1052,14 +960,15 @@ std::unique_ptr<Responder> Responder::create(const std::string& name) {
string portName = ResponderImpl::RESPONDER_PREFIX + name;
unique_ptr<zmq::message_t> reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestType(PROTO_REQUESTPORT), This::m_instance.m_impl->createRequestPortRequest(This::m_instance.m_id, portName));
unique_ptr<zmq::message_t> reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestPortRequest(This::m_instance.m_id, portName));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int responderPort = requestResponse.value();
int responderPort = response[message::RequestResponse::VALUE].GetInt();
if (responderPort == -1) {
throw ResponderCreationException(requestResponse.message());
throw ResponderCreationException(response[message::RequestResponse::MESSAGE].GetString());
}
return unique_ptr<Responder>(new Responder(&This::m_instance, responderPort, name));
......@@ -1112,41 +1021,40 @@ std::unique_ptr<Requester> Requester::create(Instance & instance, const std::str
int requesterId = RequesterImpl::newRequesterId();
string requesterPortName = RequesterImpl::getRequesterPortName(name, responderId, requesterId);
string requestTypePart = This::m_instance.m_impl->createRequestType(PROTO_CONNECTPORT);
string requestDataPart = This::m_instance.m_impl->createConnectPortRequest(responderId, responderPortName);
string request = This::m_instance.m_impl->createConnectPortRequest(responderId, responderPortName);
unique_ptr<zmq::message_t> reply = instanceRequestSocket->request(request);
unique_ptr<zmq::message_t> reply = instanceRequestSocket->request(requestTypePart, requestDataPart);
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
reply.reset();
int responderPort = requestResponse.value();
int responderPort = response[message::RequestResponse::VALUE].GetInt();
if (responderPort == -1) {
// Wait for the responder port.
instance.waitFor(0, responderPortName);
// Retry to connect.
reply = instanceRequestSocket->request(requestTypePart, requestDataPart);
reply = instanceRequestSocket->request(request);
json::parse(response, reply.get());
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
responderPort = requestResponse.value();
responderPort = response[message::RequestResponse::VALUE].GetInt();
if (responderPort == -1) {
throw RequesterCreationException(requestResponse.message());
throw RequesterCreationException(response[message::RequestResponse::MESSAGE].GetString());
}
reply.reset();
}
// Request a requester port.
reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestType(PROTO_REQUESTPORT), This::m_instance.m_impl->createRequestPortRequest(This::m_instance.m_id, requesterPortName));
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
reply = This::m_instance.m_requestSocket->request(This::m_instance.m_impl->createRequestPortRequest(This::m_instance.m_id, requesterPortName));
json::parse(response, reply.get());
int requesterPort = requestResponse.value();
int requesterPort = response[message::RequestResponse::VALUE].GetInt();
if (requesterPort == -1) {
throw RequesterCreationException(requestResponse.message());
throw RequesterCreationException(response[message::RequestResponse::MESSAGE].GetString());
}
return unique_ptr<Requester>(new Requester(&This::m_instance, responderUrl, requesterPort, responderPort, name, responderId, requesterId));
......
......@@ -137,10 +137,6 @@ public:
*/
static void setBinaryResult(const std::string& data);