Commit 5016da24 authored by legoc's avatar legoc
Browse files

Added two binary parts message for publisher/subscriber.

parent 8d4b64d6
......@@ -8,4 +8,5 @@
* Added SocketException thrown when the Server connect fails.
* Renamed ERROR into FAILURE.
* Implementation of unmanaged applications.
* Implemented This static instance without pointer so that it is not necessary to call This::terminate() except if the destruction of the static instance is not automatic.
\ No newline at end of file
* Implemented This static instance without pointer so that it is not necessary to call This::terminate() except if the destruction of the static instance is not automatic.
* Added two binary parts message for publisher/subscriber.
\ No newline at end of file
......@@ -6,7 +6,7 @@ lib_LTLIBRARIES = libcameo.la
# header files that must be in the dist package but not installed are sources
libcameo_la_SOURCES = \
cameo/impl/Serializer.cpp \
cameo/Serializer.cpp \
cameo/TimeCondition.cpp \
cameo/Event.cpp \
cameo/StatusEvent.cpp \
......@@ -63,6 +63,7 @@ libcameo_la_SOURCES = \
# header files that are installed
nobase_include_HEADERS = \
cameo/Serializer.h \
cameo/TimeCondition.h \
cameo/Application.h \
cameo/ProtoType.h \
......
......@@ -27,7 +27,6 @@
#include "impl/RequesterImpl.h"
#include "impl/RequestImpl.h"
#include "impl/ResponderImpl.h"
#include "impl/Serializer.h"
#include "impl/SocketImpl.h"
#include "impl/SubscriberImpl.h"
#include "impl/WaitingImpl.h"
......@@ -817,6 +816,10 @@ void Publisher::send(const double* data, std::size_t size) const {
m_impl->send(data, size);
}
void Publisher::sendTwoBinaryParts(const std::string& data1, const std::string& data2) const {
m_impl->sendTwoBinaryParts(data1, data2);
}
bool Publisher::hasEnded() const {
return m_impl->hasEnded();
}
......@@ -915,6 +918,10 @@ bool Subscriber::receive(std::vector<double>& data) const {
return m_impl->receive(data);
}
bool Subscriber::receiveTwoBinaryParts(std::string& data1, std::string& data2) const {
return m_impl->receiveTwoBinaryParts(data1, data2);
}
void Subscriber::cancel() {
m_waiting->cancel();
}
......
......@@ -31,6 +31,7 @@
#include "RequesterCreationException.h"
#include "ResponderCreationException.h"
#include "Response.h"
#include "Serializer.h"
#include "Services.h"
#include "TimeCondition.h"
......@@ -294,6 +295,7 @@ public:
void send(const int64_t* data, std::size_t size) const;
void send(const float* data, std::size_t size) const;
void send(const double* data, std::size_t size) const;
void sendTwoBinaryParts(const std::string& data1, const std::string& data2) const;
void sendEnd() const;
bool hasEnded() const;
......@@ -334,6 +336,7 @@ public:
bool receive(std::vector<int64_t>& data) const;
bool receive(std::vector<float>& data) const;
bool receive(std::vector<double>& data) const;
bool receiveTwoBinaryParts(std::string& data1, std::string& data2) const;
void cancel();
......
......@@ -15,8 +15,7 @@
*/
#include "Serializer.h"
#include "../../proto/Messages.pb.h"
#include "../proto/Messages.pb.h"
namespace cameo {
......@@ -137,4 +136,4 @@ void parse(const std::string& data, std::vector<double>& result) {
}
}
}
\ No newline at end of file
}
......@@ -19,8 +19,8 @@
#include <boost/bind.hpp>
#include <sstream>
#include "../Application.h"
#include "../Serializer.h"
#include "ApplicationImpl.h"
#include "Serializer.h"
using namespace std;
using namespace boost;
......@@ -215,6 +215,12 @@ void PublisherImpl::send(const double* data, std::size_t size) {
publish(STREAM, result.c_str(), result.length());
}
void PublisherImpl::sendTwoBinaryParts(const std::string& data1, const std::string& data2) {
// send a STREAM message by the publisher socket
publishTwoParts(STREAM, data1.c_str(), data1.length(), data2.c_str(), data2.length());
}
void PublisherImpl::setEnd() {
if (!m_ended && m_publisher.get() != 0) {
......@@ -246,13 +252,31 @@ void PublisherImpl::terminate() {
void PublisherImpl::publish(const std::string& header, const char* data, std::size_t size) {
zmq::message_t requestType(header.length());
zmq::message_t requestData(size);
memcpy((void *) requestType.data(), header.c_str(), header.length());
zmq::message_t requestData(size);
memcpy((void *) requestData.data(), data, size);
m_publisher->send(requestType, ZMQ_SNDMORE);
m_publisher->send(requestData);
}
void PublisherImpl::publishTwoParts(const std::string& header, const char* data1, std::size_t size1, const char* data2, std::size_t size2) {
zmq::message_t requestType(header.length());
memcpy((void *) requestType.data(), header.c_str(), header.length());
zmq::message_t requestData1(size1);
memcpy((void *) requestData1.data(), data1, size1);
zmq::message_t requestData2(size2);
memcpy((void *) requestData2.data(), data2, size2);
m_publisher->send(requestType, ZMQ_SNDMORE);
m_publisher->send(requestData1, ZMQ_SNDMORE);
m_publisher->send(requestData2);
}
zmq::message_t * PublisherImpl::processInitCommand() {
// send a dummy SYNC message by the publisher socket
......
......@@ -51,11 +51,14 @@ public:
void send(const int64_t* data, std::size_t size);
void send(const float* data, std::size_t size);
void send(const double* data, std::size_t size);
void sendTwoBinaryParts(const std::string& data1, const std::string& data2);
void setEnd();
bool hasEnded();
void terminate();
void publish(const std::string& header, const char* data, std::size_t size);
void publishTwoParts(const std::string& header, const char* data1, std::size_t size1, const char* data2, std::size_t size2);
zmq::message_t * processInitCommand();
zmq::message_t * processSubscribePublisherCommand();
zmq::message_t * processCancelPublisherSyncCommand();
......
......@@ -17,8 +17,8 @@
#include "RequestImpl.h"
#include "../Application.h"
#include "../Serializer.h"
#include "ApplicationImpl.h"
#include "Serializer.h"
using namespace std;
......
......@@ -19,8 +19,8 @@
#include <boost/bind.hpp>
#include <sstream>
#include "../Application.h"
#include "../Serializer.h"
#include "ApplicationImpl.h"
#include "Serializer.h"
using namespace std;
using namespace boost;
......
......@@ -19,9 +19,9 @@
#include <boost/bind.hpp>
#include <sstream>
#include "../Application.h"
#include "../Serializer.h"
#include "ApplicationImpl.h"
#include "RequestImpl.h"
#include "Serializer.h"
using namespace std;
using namespace boost;
......
......@@ -17,8 +17,9 @@
#include "SubscriberImpl.h"
#include <sstream>
#include "../Serializer.h"
#include "CancelIdGenerator.h"
#include "Serializer.h"
#include "ServicesImpl.h"
#include "../Server.h"
......@@ -235,6 +236,61 @@ bool SubscriberImpl::receive(std::vector<double>& data) {
return true;
}
bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data2) {
while (true) {
zmq::message_t * message = new zmq::message_t();
m_subscriber->recv(message);
string response(static_cast<char*>(message->data()), message->size());
delete message;
if (response == STREAM) {
message = new zmq::message_t();
m_subscriber->recv(message);
data1 = string(static_cast<char*>(message->data()), message->size());
delete message;
message = new zmq::message_t();
m_subscriber->recv(message);
data2 = string(static_cast<char*>(message->data()), message->size());
delete message;
return true;
} else if (response == ENDSTREAM) {
m_endOfStream = true;
return false;
} else if (response == CANCEL) {
return false;
} else if (response == STATUS) {
message = new zmq::message_t();
m_subscriber->recv(message);
proto::StatusEvent protoStatus;
protoStatus.ParseFromArray(message->data(), message->size());
delete message;
if (protoStatus.id() == m_instanceId) {
application::State state = protoStatus.applicationstate();
// test the terminal state
if (state == application::SUCCESS
|| state == application::STOPPED
|| state == application::KILLED
|| state == application::FAILURE) {
// Exit because the remote application has terminated.
return false;
}
}
}
}
return false;
}
WaitingImpl * SubscriberImpl::waiting() {
// Waiting gets the cancel publisher.
......
......@@ -42,6 +42,7 @@ public:
bool receive(std::vector<int64_t>& data);
bool receive(std::vector<float>& data);
bool receive(std::vector<double>& data);
bool receiveTwoBinaryParts(std::string& data1, std::string& data2);
WaitingImpl * waiting();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment