Commit 4c273573 authored by yannick legoc's avatar yannick legoc
Browse files
parents 7135262a e6377608
0.1.3
-----
0.1.2
-----
* Enabled to define more than one requester on the same responder in one Application instance.
* Removed zmq.hpp as it should be installed.
* Corrected some memory leaks by changing the return type of tryRequestWithOnePartReply.
0.1.0
-----
......
......@@ -4,8 +4,8 @@
#
# -----------------------------------------------------------------------------
AC_INIT(cameo-api-cpp, 0.1.2-dev)
LIBRARY_VERSION=0:1:2
AC_INIT(cameo-api-cpp, 0.1.3-dev)
LIBRARY_VERSION=0:1:3
AC_CONFIG_AUX_DIR(config)
AC_CONFIG_SRCDIR(src/cameo/Application.cpp)
......
......@@ -261,11 +261,10 @@ int This::initUnmanagedApplication() {
string strRequestType = m_impl->createRequest(PROTO_STARTEDUNMANAGED);
string strRequestData = m_impl->createStartedUnmanagedRequest(m_name);
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
return requestResponse.value();
}
......@@ -274,22 +273,20 @@ void This::terminateUnmanagedApplication() {
string strRequestType = m_impl->createRequest(PROTO_TERMINATEDUNMANAGED);
string strRequestData = m_impl->createTerminatedUnmanagedRequest(m_id);
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
}
bool This::setRunning() {
string strRequestType = m_instance.m_impl->createRequest(PROTO_SETSTATUS);
string strRequestData = m_instance.m_impl->createSetStatusRequest(m_instance.m_id, RUNNING);
zmq::message_t* reply = m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_instance.m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_instance.m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
if (requestResponse.value() == -1) {
return false;
......@@ -303,10 +300,9 @@ void This::setBinaryResult(const std::string& data) {
string strRequestType = m_instance.m_impl->createRequest(PROTO_SETRESULT);
string strRequestData = m_instance.m_impl->createSetResultRequest(m_instance.m_id, data);
zmq::message_t* reply = m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_instance.m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_instance.m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
if (requestResponse.value() == -1) {
//throw ?;
......@@ -353,11 +349,10 @@ State This::getState(int id) const {
string strRequestType = m_impl->createRequest(PROTO_GETSTATUS);
string strRequestData = m_impl->createGetStatusRequest(id);
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::StatusEvent protoStatus;
protoStatus.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
return protoStatus.applicationstate();
}
......@@ -366,11 +361,10 @@ bool This::destroyPublisher(const std::string& name) const {
string strRequestType = m_impl->createRequest(PROTO_TERMINATEPUBLISHER);
string strRequestData = m_impl->createTerminatePublisherRequest(m_id, name);
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
int value = requestResponse.value();
return (value != -1);
......@@ -380,11 +374,10 @@ bool This::removePort(const std::string& name) const {
string strRequestType = m_impl->createRequest(PROTO_REMOVEPORT);
string strRequestData = m_impl->createRemovePortRequest(m_id, name);
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
int value = requestResponse.value();
return (value != -1);
......@@ -760,10 +753,9 @@ std::auto_ptr<Publisher> Publisher::create(const std::string& name, int numberOf
string strRequestType = This::m_instance.m_impl->createRequest(PROTO_CREATEPUBLISHER);
string strRequestData = This::m_instance.m_impl->createCreatePublisherRequest(This::m_instance.m_id, name, numberOfSubscribers);
zmq::message_t* reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, This::m_instance.m_serverEndpoint);
auto_ptr<zmq::message_t> reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, This::m_instance.m_serverEndpoint);
proto::PublisherResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
int publisherPort = requestResponse.publisherport();
if (publisherPort == -1) {
......@@ -1011,10 +1003,9 @@ std::auto_ptr<Responder> Responder::create(const std::string& name) {
string strRequestType = This::m_instance.m_impl->createRequest(PROTO_REQUESTPORT);
string strRequestData = This::m_instance.m_impl->createRequestPortRequest(This::m_instance.m_id, portName);
zmq::message_t* reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, This::m_instance.m_serverEndpoint);
auto_ptr<zmq::message_t> reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, This::m_instance.m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
int responderPort = requestResponse.value();
if (responderPort == -1) {
......@@ -1071,10 +1062,10 @@ std::auto_ptr<Requester> Requester::create(Instance & instance, const std::strin
string strRequestType = This::m_instance.m_impl->createRequest(PROTO_CONNECTPORT);
string strRequestData = This::m_instance.m_impl->createConnectPortRequest(responderId, responderPortName);
zmq::message_t* reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, responderEndpoint);
auto_ptr<zmq::message_t> reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, responderEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
reply.reset();
int responderPort = requestResponse.value();
if (responderPort == -1) {
......@@ -1084,12 +1075,13 @@ std::auto_ptr<Requester> Requester::create(Instance & instance, const std::strin
// Retry to connect.
reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, responderEndpoint);
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
responderPort = requestResponse.value();
if (responderPort == -1) {
throw RequesterCreationException(requestResponse.message());
}
reply.reset();
}
// Request a requester port
......@@ -1098,7 +1090,6 @@ std::auto_ptr<Requester> Requester::create(Instance & instance, const std::strin
reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, This::m_instance.m_serverEndpoint);
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
int requesterPort = requestResponse.value();
if (requesterPort == -1) {
......
......@@ -106,11 +106,10 @@ std::auto_ptr<application::Instance> Server::start(const std::string& name, cons
string strRequestData = m_impl->createStartRequest(name, args, application::This::getReference());
try {
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
if (requestResponse.value() == -1) {
instance->setErrorMessage(requestResponse.message());
......@@ -138,11 +137,10 @@ Response Server::stopApplicationAsynchronously(int id, bool immediately) const {
strRequestData = m_impl->createStopRequest(id);
}
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
return Response(requestResponse.value(), requestResponse.message());
}
......@@ -176,11 +174,10 @@ application::InstanceArray Server::connectAll(const std::string& name) {
string strRequestType = m_impl->createRequest(PROTO_CONNECT);
string strRequestData = m_impl->createConnectRequest(name);
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::ApplicationInfoListResponse response;
response.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
// allocate the array
instances.allocate(response.applicationinfo_size());
......@@ -249,11 +246,10 @@ bool Server::isAlive(int id) const {
string strRequestType = m_impl->createRequest(PROTO_ISALIVE);
string strRequestData = m_impl->createIsAliveRequest(id);
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::IsAliveResponse isAliveResponse;
isAliveResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
return isAliveResponse.isalive();
}
......@@ -264,11 +260,10 @@ std::vector<application::Configuration> Server::getApplicationConfigurations() c
string strRequestType = m_impl->createRequest(PROTO_ALLAVAILABLE);
string strRequestData = m_impl->createAllAvailableRequest();
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::AllAvailableResponse allAvailableResponse;
allAvailableResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
for (int i = 0; i < allAvailableResponse.applicationconfig_size(); ++i) {
proto::ApplicationConfig config = allAvailableResponse.applicationconfig(i);
......@@ -293,11 +288,10 @@ std::vector<application::Info> Server::getApplicationInfos() const {
string strRequestType = m_impl->createRequest(PROTO_SHOWALL);
string strRequestData = m_impl->createShowAllRequest();
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::ApplicationInfoListResponse response;
response.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
for (int i = 0; i < response.applicationinfo_size(); ++i) {
proto::ApplicationInfo info = response.applicationinfo(i);
......@@ -338,10 +332,9 @@ std::auto_ptr<application::Subscriber> Server::createSubscriber(int id, const st
string strRequestType = m_impl->createRequest(PROTO_CONNECTPUBLISHER);
string strRequestData = m_impl->createConnectPublisherRequest(id, publisherName);
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::PublisherResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
int publisherPort = requestResponse.publisherport();
if (publisherPort == -1) {
......
......@@ -99,11 +99,10 @@ void Services::initStatus() {
// get the status port
string strRequestType = m_impl->createRequest(PROTO_STATUS);
string strRequestData = m_impl->createShowStatusRequest();
zmq::message_t* reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
auto_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
// reply ok
if (requestResponse.value() == -1) {
......
......@@ -147,11 +147,10 @@ void PublisherImpl::cancelWaitForSubscribers() {
proto::CancelPublisherSyncCommand cancelPublisherSyncCommand;
cancelPublisherSyncCommand.SerializeToString(&strRequestData);
zmq::message_t* reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.str());
auto_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.str());
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
}
WaitingImpl * PublisherImpl::waiting() {
......
......@@ -88,11 +88,10 @@ void RequesterImpl::sendBinary(const std::string& request) {
requestCommand.set_requesterport(m_requesterPort);
requestCommand.SerializeToString(&strRequestData);
zmq::message_t* reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_responderEndpoint);
auto_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_responderEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
}
void RequesterImpl::send(const std::string& request) {
......@@ -118,11 +117,10 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& request1, const std::s
requestCommand.set_requesterport(m_requesterPort);
requestCommand.SerializeToString(&strRequestData);
zmq::message_t* reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_responderEndpoint);
auto_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_responderEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
}
bool RequesterImpl::receiveBinary(std::string& response) {
......@@ -184,11 +182,10 @@ void RequesterImpl::cancel() {
string strRequestType = m_application->m_impl->createRequest(PROTO_CANCEL);
string strRequestData = "cancel";
zmq::message_t* reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, requesterEndpoint.str());
auto_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, requesterEndpoint.str());
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
}
void RequesterImpl::terminate() {
......
......@@ -56,11 +56,10 @@ void ResponderImpl::cancel() {
string strRequestType = m_application->m_impl->createRequest(PROTO_CANCEL);
string strRequestData = "cancel";
zmq::message_t* reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.str());
auto_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.str());
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
}
WaitingImpl * ResponderImpl::waiting() {
......
......@@ -82,7 +82,7 @@ std::string ServicesImpl::createStartRequest(const std::string& name, const std:
return strRequestStart;
}
zmq::message_t* ServicesImpl::tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout) {
std::auto_ptr<zmq::message_t> ServicesImpl::tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout) {
zmq::socket_t socket(m_context, ZMQ_REQ);
try {
......@@ -127,8 +127,8 @@ zmq::message_t* ServicesImpl::tryRequestWithOnePartReply(const std::string& strR
}
}
zmq::message_t *reply = new zmq::message_t;
socket.recv(reply, 0);
auto_ptr<zmq::message_t> reply(new zmq::message_t());
socket.recv(reply.get(), 0);
return reply;
}
......@@ -331,10 +331,9 @@ std::string ServicesImpl::createTerminatedUnmanagedRequest(int id) const {
bool ServicesImpl::isAvailable(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int timeout) {
try {
zmq::message_t *reply = 0;
reply = tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.c_str(), timeout);
auto_ptr<zmq::message_t> reply = tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.c_str(), timeout);
if (reply != 0) {
if (reply.get() != 0) {
return true;
}
......@@ -367,12 +366,13 @@ void ServicesImpl::waitForSubscriber(zmq::socket_t * subscriber, const std::stri
}
void ServicesImpl::subscribeToPublisher(const std::string& endpoint) {
string strRequestType = createRequest(PROTO_SUBSCRIBEPUBLISHER);
string strRequestData = createSubscribePublisherRequest();
zmq::message_t* reply = tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint);
auto_ptr<zmq::message_t> reply = tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
delete reply;
}
/**
......
......@@ -18,9 +18,10 @@
#define CAMEO_SERVICESIMPL_H_
#include "../../proto/Messages.pb.h"
#include "../ProtoType.h"
#include <vector>
#include <memory>
#include "zmq.hpp"
#include "../ProtoType.h"
namespace cameo {
......@@ -63,7 +64,7 @@ public:
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint);
zmq::message_t * tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::auto_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::string createShowStreamRequest(int id) const;
proto::MessageType_Type convertToProtoType(ProtoType type) const;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment