Commit c0c2281c authored by legoc's avatar legoc
Browse files

Removed SocketWaitingImpl

parent 5470d3b1
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "SocketWaitingImpl.h"
#include "../Messages.h"
#include "Application.h"
#include "WaitingImplSet.h"
using namespace std;
namespace cameo {
SocketWaitingImpl::SocketWaitingImpl(zmq::socket_t* socket, const std::string& message) :
m_socket(socket), m_message(message) {
// Add the object in the waiting set if This exists.
if (application::This::m_instance.m_inited) {
application::This::m_instance.m_waitingSet->add(this);
}
}
SocketWaitingImpl::~SocketWaitingImpl() {
// Remove the object in the waiting set if This exists.
if (application::This::m_instance.m_inited) {
application::This::m_instance.m_waitingSet->remove(this);
}
m_socket->close();
}
void SocketWaitingImpl::cancel() {
zmq::message_t requestType(m_message.length());
string data(message::Event::CANCEL);
zmq::message_t requestData(data.length());
memcpy(requestType.data(), m_message.c_str(), m_message.length());
memcpy(requestData.data(), data.c_str(), data.length());
m_socket->send(requestType, ZMQ_SNDMORE);
m_socket->send(requestData);
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_SOCKETWAITINGIMPL_H_
#define CAMEO_SOCKETWAITINGIMPL_H_
#include "WaitingImpl.h"
#include "zmq.hpp"
#include <string>
#include <memory>
namespace cameo {
class SocketWaitingImpl : public WaitingImpl {
public:
SocketWaitingImpl(zmq::socket_t* socket, const std::string& message);
virtual ~SocketWaitingImpl();
virtual void cancel();
zmq::socket_t* m_socket;
std::string m_message;
};
}
#endif
......@@ -16,13 +16,14 @@
#include "SubscriberImpl.h"
#include "../../base/CancelIdGenerator.h"
#include "Serializer.h"
#include "Server.h"
#include "JSON.h"
#include "../../base/impl/zmq/ContextZmq.h"
#include "../../base/CancelIdGenerator.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include "../../base/impl/zmq/ContextZmq.h"
#include "../../base/impl/GenericWaitingImpl.h"
#include <sstream>
namespace cameo {
......@@ -215,10 +216,23 @@ std::optional<std::tuple<std::string, std::string>> SubscriberImpl::receiveTwoBi
return {};
}
void SubscriberImpl::cancel() {
std::string m_message = message::Event::CANCEL;
zmq::message_t requestType(m_message.length());
std::string data(message::Event::CANCEL);
zmq::message_t requestData(data.length());
memcpy(requestType.data(), m_message.c_str(), m_message.length());
memcpy(requestData.data(), data.c_str(), data.length());
m_cancelPublisher->send(requestType, ZMQ_SNDMORE);
m_cancelPublisher->send(requestData);
}
WaitingImpl * SubscriberImpl::waiting() {
// Waiting gets the cancel publisher.
return new SocketWaitingImpl(m_cancelPublisher.get(), message::Event::CANCEL);
return new GenericWaitingImpl(std::bind(&SubscriberImpl::cancel, this));
}
std::string SubscriberImpl::createSubscribePublisherRequest() const {
......
......@@ -17,7 +17,6 @@
#ifndef CAMEO_SUBSCRIBERIMPL_H_
#define CAMEO_SUBSCRIBERIMPL_H_
#include "../../base/impl/SocketWaitingImpl.h"
#include "Application.h"
#include "zmq.hpp"
#include <string>
......@@ -28,6 +27,7 @@
namespace cameo {
class Server;
class WaitingImpl;
namespace coms {
......@@ -46,6 +46,7 @@ public:
std::optional<std::string> receive();
std::optional<std::tuple<std::string, std::string>> receiveTwoBinaryParts();
void cancel();
WaitingImpl * waiting();
std::string createSubscribePublisherRequest() const;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment