Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Cameo
cameo
Commits
924d8cfe
Commit
924d8cfe
authored
Dec 09, 2021
by
legoc
Browse files
Renamed ContextImpl into ContextZmq
parent
ae998829
Changes
16
Hide whitespace changes
Inline
Side-by-side
cameo-api-cpp/include/Server.h
View file @
924d8cfe
...
...
@@ -37,7 +37,7 @@ namespace application {
class
EventListener
;
class
EventThread
;
class
Context
Impl
;
class
Context
Zmq
;
class
RequestSocket
;
class
Server
{
...
...
@@ -166,7 +166,7 @@ private:
Endpoint
m_serverEndpoint
;
std
::
array
<
int
,
3
>
m_serverVersion
;
int
m_statusPort
;
std
::
unique_ptr
<
Context
Impl
>
m_contextImpl
;
std
::
unique_ptr
<
Context
Zmq
>
m_contextImpl
;
std
::
unique_ptr
<
RequestSocket
>
m_requestSocket
;
std
::
mutex
m_eventListenersMutex
;
...
...
cameo-api-cpp/src/base/Application.cpp
View file @
924d8cfe
...
...
@@ -29,7 +29,7 @@
#include "impl/StreamSocketImpl.h"
#include "impl/WaitingImpl.h"
#include "impl/WaitingImplSet.h"
#include "impl/Context
Impl
.h"
#include "impl/
zmq/
Context
Zmq
.h"
#include "Strings.h"
#include "Server.h"
#include "Messages.h"
...
...
cameo-api-cpp/src/base/OutputStreamSocket.cpp
View file @
924d8cfe
...
...
@@ -19,7 +19,7 @@
#include "JSON.h"
#include "impl/SocketWaitingImpl.h"
#include "impl/StreamSocketImpl.h"
#include "impl/Context
Impl
.h"
#include "impl/
zmq/
Context
Zmq
.h"
#include "Messages.h"
#include <iostream>
...
...
cameo-api-cpp/src/base/RequestSocket.cpp
View file @
924d8cfe
...
...
@@ -17,7 +17,6 @@
#include "RequestSocket.h"
#include "ConnectionTimeout.h"
#include "impl/ContextImpl.h"
#include "impl/zmq/RequestSocketZmq.h"
#include <iostream>
#include <chrono>
...
...
cameo-api-cpp/src/base/Server.cpp
View file @
924d8cfe
...
...
@@ -23,7 +23,7 @@
#include "EventThread.h"
#include "impl/CancelIdGenerator.h"
#include "impl/StreamSocketImpl.h"
#include "impl/Context
Impl
.h"
#include "impl/
zmq/
Context
Zmq
.h"
#include "JSON.h"
#include "Messages.h"
#include "RequestSocket.h"
...
...
@@ -636,7 +636,7 @@ void Server::unregisterEventListener(EventListener * listener) {
void
Server
::
initContext
()
{
// Set the impl.
m_contextImpl
.
reset
(
new
Context
Impl
());
m_contextImpl
.
reset
(
new
Context
Zmq
());
}
void
Server
::
initRequestSocket
()
{
...
...
cameo-api-cpp/src/base/impl/Context
Impl
.cpp
→
cameo-api-cpp/src/base/impl/
zmq/
Context
Zmq
.cpp
View file @
924d8cfe
...
...
@@ -14,37 +14,36 @@
* limitations under the Licence.
*/
#include "ContextZmq.h"
#include "SocketException.h"
#include "ConnectionTimeout.h"
#include "JSON.h"
#include "../Messages.h"
#include "../
../
Messages.h"
#include "../../RequestSocket.h"
#include <iostream>
#include <sstream>
#include "ContextImpl.h"
#include "../RequestSocket.h"
using
namespace
std
;
namespace
cameo
{
Context
Impl
::
Context
Impl
()
:
Context
(),
Context
Zmq
::
Context
Zmq
()
:
Context
(),
m_context
(
1
),
m_timeout
(
0
)
{
}
Context
Impl
::~
Context
Impl
()
{
Context
Zmq
::~
Context
Zmq
()
{
}
void
Context
Impl
::
setTimeout
(
int
timeout
)
{
void
Context
Zmq
::
setTimeout
(
int
timeout
)
{
m_timeout
=
timeout
;
}
int
Context
Impl
::
getTimeout
()
const
{
int
Context
Zmq
::
getTimeout
()
const
{
return
m_timeout
;
}
zmq
::
socket_t
*
Context
Impl
::
createEventSubscriber
(
const
std
::
string
&
endpoint
,
const
std
::
string
&
cancelEndpoint
)
{
zmq
::
socket_t
*
Context
Zmq
::
createEventSubscriber
(
const
std
::
string
&
endpoint
,
const
std
::
string
&
cancelEndpoint
)
{
zmq
::
socket_t
*
subscriber
=
new
zmq
::
socket_t
(
m_context
,
ZMQ_SUB
);
...
...
@@ -66,7 +65,7 @@ zmq::socket_t * ContextImpl::createEventSubscriber(const std::string& endpoint,
return
subscriber
;
}
zmq
::
socket_t
*
Context
Impl
::
createOutputStreamSubscriber
(
const
std
::
string
&
endpoint
,
const
std
::
string
&
cancelEndpoint
)
{
zmq
::
socket_t
*
Context
Zmq
::
createOutputStreamSubscriber
(
const
std
::
string
&
endpoint
,
const
std
::
string
&
cancelEndpoint
)
{
zmq
::
socket_t
*
subscriber
=
new
zmq
::
socket_t
(
m_context
,
ZMQ_SUB
);
...
...
@@ -86,7 +85,7 @@ zmq::socket_t * ContextImpl::createOutputStreamSubscriber(const std::string& end
return
subscriber
;
}
zmq
::
socket_t
*
Context
Impl
::
createCancelPublisher
(
const
std
::
string
&
endpoint
)
{
zmq
::
socket_t
*
Context
Zmq
::
createCancelPublisher
(
const
std
::
string
&
endpoint
)
{
zmq
::
socket_t
*
publisher
=
new
zmq
::
socket_t
(
m_context
,
ZMQ_PUB
);
publisher
->
bind
(
endpoint
.
c_str
());
...
...
@@ -94,7 +93,7 @@ zmq::socket_t * ContextImpl::createCancelPublisher(const std::string& endpoint)
return
publisher
;
}
zmq
::
socket_t
*
Context
Impl
::
createRequestSocket
(
const
std
::
string
&
endpoint
)
{
zmq
::
socket_t
*
Context
Zmq
::
createRequestSocket
(
const
std
::
string
&
endpoint
)
{
zmq
::
socket_t
*
socket
=
new
zmq
::
socket_t
(
m_context
,
ZMQ_REQ
);
...
...
@@ -113,7 +112,7 @@ zmq::socket_t * ContextImpl::createRequestSocket(const std::string& endpoint) {
return
socket
;
}
bool
Context
Impl
::
isAvailable
(
RequestSocket
*
socket
,
int
timeout
)
{
bool
Context
Zmq
::
isAvailable
(
RequestSocket
*
socket
,
int
timeout
)
{
try
{
socket
->
requestJSON
(
createSyncRequest
(),
timeout
);
...
...
@@ -129,7 +128,7 @@ bool ContextImpl::isAvailable(RequestSocket * socket, int timeout) {
return
false
;
}
void
Context
Impl
::
sendSyncStream
(
RequestSocket
*
socket
,
const
std
::
string
&
name
)
{
void
Context
Zmq
::
sendSyncStream
(
RequestSocket
*
socket
,
const
std
::
string
&
name
)
{
try
{
socket
->
requestJSON
(
createSyncStreamRequest
(
name
));
...
...
@@ -142,7 +141,7 @@ void ContextImpl::sendSyncStream(RequestSocket * socket, const std::string& name
}
}
void
Context
Impl
::
waitForStreamSubscriber
(
zmq
::
socket_t
*
subscriber
,
RequestSocket
*
socket
,
const
std
::
string
&
name
)
{
void
Context
Zmq
::
waitForStreamSubscriber
(
zmq
::
socket_t
*
subscriber
,
RequestSocket
*
socket
,
const
std
::
string
&
name
)
{
// Poll subscriber.
zmq_pollitem_t
items
[
1
];
...
...
@@ -162,7 +161,7 @@ void ContextImpl::waitForStreamSubscriber(zmq::socket_t * subscriber, RequestSoc
}
}
void
Context
Impl
::
waitForSubscriber
(
zmq
::
socket_t
*
subscriber
,
RequestSocket
*
socket
)
{
void
Context
Zmq
::
waitForSubscriber
(
zmq
::
socket_t
*
subscriber
,
RequestSocket
*
socket
)
{
// Poll subscriber.
zmq_pollitem_t
items
[
1
];
...
...
cameo-api-cpp/src/base/impl/Context
Impl
.h
→
cameo-api-cpp/src/base/impl/
zmq/
Context
Zmq
.h
View file @
924d8cfe
...
...
@@ -14,23 +14,23 @@
* limitations under the Licence.
*/
#ifndef CAMEO_CONTEXT
IMPL
_H_
#define CAMEO_CONTEXT
IMPL
_H_
#ifndef CAMEO_CONTEXT
ZMQ
_H_
#define CAMEO_CONTEXT
ZMQ
_H_
#include "Context.h"
#include "zmq.hpp"
#include <vector>
#include <memory>
#include <zmq.hpp>
namespace
cameo
{
class
RequestSocket
;
class
Context
Impl
:
public
Context
{
class
Context
Zmq
:
public
Context
{
public:
Context
Impl
();
virtual
~
Context
Impl
();
Context
Zmq
();
virtual
~
Context
Zmq
();
void
setTimeout
(
int
timeout
);
int
getTimeout
()
const
;
...
...
cameo-api-cpp/src/base/impl/zmq/RequestSocketZmq.cpp
View file @
924d8cfe
...
...
@@ -17,7 +17,7 @@
#include "RequestSocketZmq.h"
#include "ConnectionTimeout.h"
#include "
../
Context
Impl
.h"
#include "Context
Zmq
.h"
#include <iostream>
#include <chrono>
#include <thread>
...
...
@@ -27,7 +27,7 @@ using namespace std;
namespace
cameo
{
RequestSocketZmq
::
RequestSocketZmq
(
Context
*
context
,
const
std
::
string
&
endpoint
,
int
timeout
)
:
m_services
(
dynamic_cast
<
Context
Impl
*>
(
context
)),
m_endpoint
(
endpoint
)
{
m_services
(
dynamic_cast
<
Context
Zmq
*>
(
context
)),
m_endpoint
(
endpoint
)
{
init
();
...
...
cameo-api-cpp/src/base/impl/zmq/RequestSocketZmq.h
View file @
924d8cfe
...
...
@@ -24,7 +24,7 @@
namespace
cameo
{
class
Context
;
class
Context
Impl
;
class
Context
Zmq
;
class
RequestSocketZmq
:
public
RequestSocketImpl
{
...
...
@@ -38,7 +38,7 @@ public:
virtual
std
::
string
request
(
const
std
::
string
&
requestPart1
,
const
std
::
string
&
requestPart2
,
int
overrideTimeout
);
virtual
std
::
string
request
(
const
std
::
string
&
requestPart1
,
const
std
::
string
&
requestPart2
,
const
std
::
string
&
requestPart3
,
int
overrideTimeout
);
Context
Impl
*
m_services
;
Context
Zmq
*
m_services
;
std
::
string
m_endpoint
;
std
::
unique_ptr
<
zmq
::
socket_t
>
m_socket
;
int
m_timeout
;
...
...
cameo-api-cpp/src/coms/PublisherSubscriber.cpp
View file @
924d8cfe
...
...
@@ -16,13 +16,13 @@
#include "PublisherSubscriber.h"
#include "../base/impl/ContextImpl.h"
#include "JSON.h"
#include "Server.h"
#include "../base/impl/zmq/ContextZmq.h"
#include "../base/Messages.h"
#include "../base/RequestSocket.h"
#include "impl/PublisherImpl.h"
#include "impl/SubscriberImpl.h"
#include "JSON.h"
#include "Server.h"
namespace
cameo
{
namespace
coms
{
...
...
cameo-api-cpp/src/coms/RequesterResponder.cpp
View file @
924d8cfe
...
...
@@ -16,14 +16,14 @@
#include "RequesterResponder.h"
#include "../base/impl/ContextImpl.h"
#include "JSON.h"
#include "Server.h"
#include "../base/impl/zmq/ContextZmq.h"
#include "../base/Messages.h"
#include "../base/RequestSocket.h"
#include "impl/RequesterImpl.h"
#include "impl/RequestImpl.h"
#include "impl/ResponderImpl.h"
#include "JSON.h"
#include "Server.h"
namespace
cameo
{
namespace
coms
{
...
...
cameo-api-cpp/src/coms/impl/PublisherImpl.cpp
View file @
924d8cfe
...
...
@@ -19,7 +19,7 @@
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
#include "../../base/impl/Context
Impl
.h"
#include "../../base/impl/
zmq/
Context
Zmq
.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <sstream>
...
...
@@ -34,7 +34,7 @@ PublisherImpl::PublisherImpl(int publisherPort, int synchronizerPort, const std:
m_ended
(
false
)
{
// create a socket for publishing
Context
Impl
*
contextImpl
=
dynamic_cast
<
Context
Impl
*>
(
application
::
This
::
getCom
().
getContext
());
Context
Zmq
*
contextImpl
=
dynamic_cast
<
Context
Zmq
*>
(
application
::
This
::
getCom
().
getContext
());
m_publisher
.
reset
(
new
zmq
::
socket_t
(
contextImpl
->
m_context
,
ZMQ_PUB
));
std
::
stringstream
pubEndpoint
;
pubEndpoint
<<
"tcp://*:"
<<
publisherPort
;
...
...
@@ -69,7 +69,7 @@ bool PublisherImpl::waitForSubscribers() {
}
// Create a socket to receive the messages from the subscribers.
Context
Impl
*
contextImpl
=
dynamic_cast
<
Context
Impl
*>
(
application
::
This
::
getCom
().
getContext
());
Context
Zmq
*
contextImpl
=
dynamic_cast
<
Context
Zmq
*>
(
application
::
This
::
getCom
().
getContext
());
zmq
::
socket_t
synchronizer
(
contextImpl
->
m_context
,
ZMQ_REP
);
std
::
stringstream
syncEndpoint
;
...
...
cameo-api-cpp/src/coms/impl/RequestImpl.cpp
View file @
924d8cfe
...
...
@@ -19,7 +19,7 @@
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
#include "../../base/impl/Context
Impl
.h"
#include "../../base/impl/
zmq/
Context
Zmq
.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <sstream>
...
...
cameo-api-cpp/src/coms/impl/RequesterImpl.cpp
View file @
924d8cfe
...
...
@@ -19,7 +19,7 @@
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
#include "../../base/impl/Context
Impl
.h"
#include "../../base/impl/
zmq/
Context
Zmq
.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <sstream>
...
...
@@ -42,7 +42,7 @@ RequesterImpl::RequesterImpl(const Endpoint& endpoint, int requesterPort, int re
m_requestSocket
=
application
::
This
::
getCom
().
createRequestSocket
(
endpoint
.
withPort
(
responderPort
).
toString
());
// Create a socket REP.
Context
Impl
*
contextImpl
=
dynamic_cast
<
Context
Impl
*>
(
application
::
This
::
getCom
().
getContext
());
Context
Zmq
*
contextImpl
=
dynamic_cast
<
Context
Zmq
*>
(
application
::
This
::
getCom
().
getContext
());
m_repSocket
.
reset
(
new
zmq
::
socket_t
(
contextImpl
->
m_context
,
ZMQ_REP
));
std
::
stringstream
reqEndpoint
;
reqEndpoint
<<
"tcp://*:"
<<
m_requesterPort
;
...
...
cameo-api-cpp/src/coms/impl/ResponderImpl.cpp
View file @
924d8cfe
...
...
@@ -20,7 +20,7 @@
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
#include "../../base/impl/Context
Impl
.h"
#include "../../base/impl/
zmq/
Context
Zmq
.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <sstream>
...
...
@@ -36,7 +36,7 @@ ResponderImpl::ResponderImpl(int responderPort, const std::string& name) :
m_canceled
(
false
)
{
// create a socket REP
Context
Impl
*
contextImpl
=
dynamic_cast
<
Context
Impl
*>
(
application
::
This
::
getCom
().
getContext
());
Context
Zmq
*
contextImpl
=
dynamic_cast
<
Context
Zmq
*>
(
application
::
This
::
getCom
().
getContext
());
m_responder
.
reset
(
new
zmq
::
socket_t
(
contextImpl
->
m_context
,
ZMQ_REP
));
std
::
stringstream
repEndpoint
;
repEndpoint
<<
"tcp://*:"
<<
m_responderPort
;
...
...
cameo-api-cpp/src/coms/impl/SubscriberImpl.cpp
View file @
924d8cfe
...
...
@@ -20,7 +20,7 @@
#include "Server.h"
#include "JSON.h"
#include "../../base/impl/CancelIdGenerator.h"
#include "../../base/impl/Context
Impl
.h"
#include "../../base/impl/
zmq/
Context
Zmq
.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <sstream>
...
...
@@ -47,7 +47,7 @@ SubscriberImpl::~SubscriberImpl() {
void
SubscriberImpl
::
init
()
{
// Create a socket for publishing.
Context
Impl
*
contextImpl
=
dynamic_cast
<
Context
Impl
*>
(
application
::
This
::
getCom
().
getContext
());
Context
Zmq
*
contextImpl
=
dynamic_cast
<
Context
Zmq
*>
(
application
::
This
::
getCom
().
getContext
());
m_subscriber
.
reset
(
new
zmq
::
socket_t
(
contextImpl
->
m_context
,
ZMQ_SUB
));
m_subscriber
->
setsockopt
(
ZMQ_SUBSCRIBE
,
message
::
Event
::
SYNC
,
std
::
string
(
message
::
Event
::
SYNC
).
length
());
m_subscriber
->
setsockopt
(
ZMQ_SUBSCRIBE
,
message
::
Event
::
STREAM
,
std
::
string
(
message
::
Event
::
STREAM
).
length
());
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment