Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Cameo
cameo
Commits
c2fa6de2
Commit
c2fa6de2
authored
Mar 05, 2020
by
legoc
Browse files
Services uses request socket
parent
2ab325b8
Changes
3
Hide whitespace changes
Inline
Side-by-side
src/cameo/Services.cpp
View file @
c2fa6de2
...
...
@@ -126,7 +126,7 @@ void Services::initStatus() {
std
::
unique_ptr
<
EventStreamSocket
>
Services
::
openEventStream
()
{
//
i
nit the status
if needed
//
I
nit the status
port if necessary.
if
(
m_statusPort
==
0
)
{
initStatus
();
}
...
...
@@ -136,15 +136,14 @@ std::unique_ptr<EventStreamSocket> Services::openEventStream() {
// We define a unique name that depends on the event stream socket object because there can be many (instances).
cancelEndpoint
<<
"inproc://cancel."
<<
CancelIdGenerator
::
newId
();
//
c
reate sockets
//
C
reate
the
sockets
.
zmq
::
socket_t
*
cancelPublisher
=
m_impl
->
createCancelPublisher
(
cancelEndpoint
.
str
());
zmq
::
socket_t
*
subscriber
=
m_impl
->
createEventSubscriber
(
m_serverStatusEndpoint
,
cancelEndpoint
.
str
());
// wait for the connection
string
strRequestType
=
m_impl
->
createRequestType
(
PROTO_INIT
);
string
strRequestData
=
m_impl
->
createInitRequest
();
m_impl
->
waitForSubscriber
(
subscriber
,
strRequestType
,
strRequestData
,
m_serverEndpoint
);
// Wait for the connection to be ready.
m_impl
->
waitForSubscriber
(
subscriber
,
m_requestSocket
.
get
());
// Create the event stream socket.
return
unique_ptr
<
EventStreamSocket
>
(
new
EventStreamSocket
(
new
StreamSocketImpl
(
subscriber
,
cancelPublisher
)));
}
...
...
@@ -154,15 +153,17 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port)
return
nullptr
;
}
// Prepare our context and subscriber
// Prepare our context and subscriber
.
string
streamEndpoint
=
m_url
+
":"
+
to_string
(
port
);
// We define a unique name that depends on the event stream socket object because there can be many (instances).
string
cancelEndpoint
=
"inproc://cancel."
+
to_string
(
CancelIdGenerator
::
newId
());
// Create the sockets.
zmq
::
socket_t
*
cancelPublisher
=
m_impl
->
createCancelPublisher
(
cancelEndpoint
);
zmq
::
socket_t
*
subscriber
=
m_impl
->
createOutputStreamSubscriber
(
streamEndpoint
,
cancelEndpoint
);
// Create the output stream socket.
return
unique_ptr
<
OutputStreamSocket
>
(
new
OutputStreamSocket
(
new
StreamSocketImpl
(
subscriber
,
cancelPublisher
)));
}
...
...
src/cameo/impl/ServicesImpl.cpp
View file @
c2fa6de2
...
...
@@ -475,27 +475,6 @@ bool ServicesImpl::isAvailable(const std::string& strRequestType, const std::str
return
false
;
}
void
ServicesImpl
::
waitForSubscriber
(
zmq
::
socket_t
*
subscriber
,
const
std
::
string
&
strRequestType
,
const
std
::
string
&
strRequestData
,
const
std
::
string
&
endpoint
)
{
// polling subscriber
zmq_pollitem_t
items
[
1
];
items
[
0
].
socket
=
static_cast
<
void
*>
(
*
subscriber
);
items
[
0
].
fd
=
0
;
items
[
0
].
events
=
ZMQ_POLLIN
;
items
[
0
].
revents
=
0
;
bool
ready
=
false
;
while
(
!
ready
)
{
isAvailable
(
strRequestType
,
strRequestData
,
endpoint
,
100
);
// wait for 100ms ?
int
rc
=
zmq
::
poll
(
items
,
1
,
100
);
if
(
rc
!=
0
)
{
ready
=
true
;
}
}
}
void
ServicesImpl
::
subscribeToPublisher
(
const
std
::
string
&
endpoint
)
{
string
strRequestType
=
createRequestType
(
PROTO_SUBSCRIBEPUBLISHER
);
...
...
@@ -525,4 +504,24 @@ bool ServicesImpl::isAvailable(RequestSocketImpl * socket, int timeout) {
return
false
;
}
void
ServicesImpl
::
waitForSubscriber
(
zmq
::
socket_t
*
subscriber
,
RequestSocketImpl
*
socket
)
{
// Poll subscriber.
zmq_pollitem_t
items
[
1
];
items
[
0
].
socket
=
static_cast
<
void
*>
(
*
subscriber
);
items
[
0
].
fd
=
0
;
items
[
0
].
events
=
ZMQ_POLLIN
;
items
[
0
].
revents
=
0
;
while
(
true
)
{
isAvailable
(
socket
,
100
);
// Wait for 100ms.
int
rc
=
zmq
::
poll
(
items
,
1
,
100
);
if
(
rc
!=
0
)
{
break
;
}
}
}
}
src/cameo/impl/ServicesImpl.h
View file @
c2fa6de2
...
...
@@ -71,10 +71,10 @@ public:
std
::
string
createShowStreamRequest
(
int
id
)
const
;
bool
isAvailable
(
const
std
::
string
&
strRequestType
,
const
std
::
string
&
strRequestData
,
const
std
::
string
&
endpoint
,
int
timeout
);
void
waitForSubscriber
(
zmq
::
socket_t
*
subscriber
,
const
std
::
string
&
strRequestType
,
const
std
::
string
&
strRequestData
,
const
std
::
string
&
endpoint
);
void
subscribeToPublisher
(
const
std
::
string
&
endpoint
);
bool
isAvailable
(
RequestSocketImpl
*
socket
,
int
timeout
);
void
waitForSubscriber
(
zmq
::
socket_t
*
subscriber
,
RequestSocketImpl
*
socket
);
zmq
::
context_t
m_context
;
int
m_timeout
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment