Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Cameo
cameo
Commits
f0e93273
Commit
f0e93273
authored
Mar 05, 2020
by
legoc
Browse files
Use request socket in Requester
parent
33a6add6
Changes
6
Hide whitespace changes
Inline
Side-by-side
src/cameo/Application.cpp
View file @
f0e93273
...
...
@@ -752,7 +752,7 @@ std::unique_ptr<Instance>& InstanceArray::operator[](std::size_t index) {
///////////////////////////////////////////////////////////////////////////////
// Publisher
Publisher
::
Publisher
(
const
This
*
application
,
int
publisherPort
,
int
synchronizerPort
,
const
std
::
string
&
name
,
int
numberOfSubscribers
)
:
Publisher
::
Publisher
(
This
*
application
,
int
publisherPort
,
int
synchronizerPort
,
const
std
::
string
&
name
,
int
numberOfSubscribers
)
:
m_impl
(
new
PublisherImpl
(
application
,
publisherPort
,
synchronizerPort
,
name
,
numberOfSubscribers
))
{
// Create the waiting here.
...
...
@@ -1061,7 +1061,7 @@ bool Responder::isCanceled() const {
///////////////////////////////////////////////////////////////////////////
// Requester
Requester
::
Requester
(
const
application
::
This
*
application
,
const
std
::
string
&
url
,
int
requesterPort
,
int
responderPort
,
const
std
::
string
&
name
,
int
responderId
,
int
requesterId
)
:
Requester
::
Requester
(
application
::
This
*
application
,
const
std
::
string
&
url
,
int
requesterPort
,
int
responderPort
,
const
std
::
string
&
name
,
int
responderId
,
int
requesterId
)
:
m_impl
(
new
RequesterImpl
(
application
,
url
,
requesterPort
,
responderPort
,
name
,
responderId
,
requesterId
))
{
// Create the waiting here.
...
...
@@ -1077,14 +1077,18 @@ std::unique_ptr<Requester> Requester::create(Instance & instance, const std::str
string
responderUrl
=
instance
.
getUrl
();
string
responderEndpoint
=
instance
.
getEndpoint
();
// Create a request socket to the server of the instance.
unique_ptr
<
RequestSocketImpl
>
instanceRequestSocket
=
This
::
m_instance
.
createRequestSocket
(
responderEndpoint
);
string
responderPortName
=
ResponderImpl
::
RESPONDER_PREFIX
+
name
;
int
requesterId
=
RequesterImpl
::
newRequesterId
();
string
requesterPortName
=
RequesterImpl
::
getRequesterPortName
(
name
,
responderId
,
requesterId
);
string
strRequestType
=
This
::
m_instance
.
m_impl
->
createRequestType
(
PROTO_CONNECTPORT
);
string
strRequestData
=
This
::
m_instance
.
m_impl
->
createConnectPortRequest
(
responderId
,
responderPortName
);
string
requestTypePart
=
This
::
m_instance
.
m_impl
->
createRequestType
(
PROTO_CONNECTPORT
);
string
requestDataPart
=
This
::
m_instance
.
m_impl
->
createConnectPortRequest
(
responderId
,
responderPortName
);
unique_ptr
<
zmq
::
message_t
>
reply
=
instanceRequestSocket
->
request
(
requestTypePart
,
requestDataPart
);
unique_ptr
<
zmq
::
message_t
>
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
responderEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
reply
.
reset
();
...
...
@@ -1095,7 +1099,8 @@ std::unique_ptr<Requester> Requester::create(Instance & instance, const std::str
instance
.
waitFor
(
0
,
responderPortName
);
// Retry to connect.
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
responderEndpoint
);
reply
=
instanceRequestSocket
->
request
(
requestTypePart
,
requestDataPart
);
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
responderPort
=
requestResponse
.
value
();
...
...
src/cameo/Application.h
View file @
f0e93273
...
...
@@ -324,7 +324,7 @@ public:
bool
isEnded
()
const
;
private:
Publisher
(
const
application
::
This
*
application
,
int
publisherPort
,
int
synchronizerPort
,
const
std
::
string
&
name
,
int
numberOfSubscribers
);
Publisher
(
application
::
This
*
application
,
int
publisherPort
,
int
synchronizerPort
,
const
std
::
string
&
name
,
int
numberOfSubscribers
);
std
::
unique_ptr
<
PublisherImpl
>
m_impl
;
std
::
unique_ptr
<
WaitingImpl
>
m_waiting
;
...
...
@@ -471,7 +471,7 @@ public:
bool
isCanceled
()
const
;
private:
Requester
(
const
application
::
This
*
application
,
const
std
::
string
&
url
,
int
requesterPort
,
int
responderPort
,
const
std
::
string
&
name
,
int
responderId
,
int
requesterId
);
Requester
(
application
::
This
*
application
,
const
std
::
string
&
url
,
int
requesterPort
,
int
responderPort
,
const
std
::
string
&
name
,
int
responderId
,
int
requesterId
);
std
::
unique_ptr
<
RequesterImpl
>
m_impl
;
std
::
unique_ptr
<
WaitingImpl
>
m_waiting
;
...
...
src/cameo/impl/PublisherImpl.cpp
View file @
f0e93273
...
...
@@ -18,6 +18,7 @@
#include "../Application.h"
#include "../Serializer.h"
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
#include <sstream>
using
namespace
std
;
...
...
@@ -28,7 +29,7 @@ const std::string PublisherImpl::SYNC = "SYNC";
const
std
::
string
PublisherImpl
::
STREAM
=
"STREAM"
;
const
std
::
string
PublisherImpl
::
ENDSTREAM
=
"ENDSTREAM"
;
PublisherImpl
::
PublisherImpl
(
const
application
::
This
*
application
,
int
publisherPort
,
int
synchronizerPort
,
const
std
::
string
&
name
,
int
numberOfSubscribers
)
:
PublisherImpl
::
PublisherImpl
(
application
::
This
*
application
,
int
publisherPort
,
int
synchronizerPort
,
const
std
::
string
&
name
,
int
numberOfSubscribers
)
:
m_application
(
application
),
m_publisherPort
(
publisherPort
),
m_synchronizerPort
(
synchronizerPort
),
...
...
@@ -134,13 +135,14 @@ void PublisherImpl::cancelWaitForSubscribers() {
stringstream
endpoint
;
endpoint
<<
m_application
->
getUrl
()
<<
":"
<<
(
m_publisherPort
+
1
);
string
strRequestType
=
m_application
->
m_impl
->
createRequestType
(
PROTO_CANCEL
);
string
strRequestData
;
string
requestDataPart
;
proto
::
CancelPublisherSyncCommand
cancelPublisherSyncCommand
;
cancelPublisherSyncCommand
.
SerializeToString
(
&
strR
equestData
);
cancelPublisherSyncCommand
.
SerializeToString
(
&
r
equestData
Part
);
unique_ptr
<
zmq
::
message_t
>
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
endpoint
.
str
());
// Create a request socket only for the request.
unique_ptr
<
RequestSocketImpl
>
requestSocket
=
m_application
->
createRequestSocket
(
endpoint
.
str
());
unique_ptr
<
zmq
::
message_t
>
reply
=
requestSocket
->
request
(
m_application
->
m_impl
->
createRequestType
(
PROTO_CANCEL
),
requestDataPart
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
...
...
src/cameo/impl/PublisherImpl.h
View file @
f0e93273
...
...
@@ -32,7 +32,7 @@ namespace application {
class
PublisherImpl
{
public:
PublisherImpl
(
const
application
::
This
*
application
,
int
publisherPort
,
int
synchronizerPort
,
const
std
::
string
&
name
,
int
numberOfSubscribers
);
PublisherImpl
(
application
::
This
*
application
,
int
publisherPort
,
int
synchronizerPort
,
const
std
::
string
&
name
,
int
numberOfSubscribers
);
~
PublisherImpl
();
const
std
::
string
&
getName
()
const
;
...
...
@@ -62,7 +62,7 @@ public:
zmq
::
message_t
*
processSubscribePublisherCommand
();
zmq
::
message_t
*
processCancelPublisherSyncCommand
();
const
application
::
This
*
m_application
;
application
::
This
*
m_application
;
int
m_publisherPort
;
int
m_synchronizerPort
;
std
::
string
m_name
;
...
...
src/cameo/impl/RequesterImpl.cpp
View file @
f0e93273
...
...
@@ -18,6 +18,7 @@
#include "../Application.h"
#include "../Serializer.h"
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
#include <sstream>
using
namespace
std
;
...
...
@@ -28,7 +29,7 @@ const std::string RequesterImpl::REQUESTER_PREFIX = "req.";
std
::
mutex
RequesterImpl
::
m_mutex
;
int
RequesterImpl
::
m_requesterCounter
=
0
;
RequesterImpl
::
RequesterImpl
(
const
application
::
This
*
application
,
const
std
::
string
&
url
,
int
requesterPort
,
int
responderPort
,
const
std
::
string
&
name
,
int
responderId
,
int
requesterId
)
:
RequesterImpl
::
RequesterImpl
(
application
::
This
*
application
,
const
std
::
string
&
url
,
int
requesterPort
,
int
responderPort
,
const
std
::
string
&
name
,
int
responderId
,
int
requesterId
)
:
m_application
(
application
),
m_requesterPort
(
requesterPort
),
m_name
(
name
),
...
...
@@ -40,12 +41,15 @@ RequesterImpl::RequesterImpl(const application::This * application, const std::s
repEndpoint
<<
url
<<
":"
<<
responderPort
;
m_responderEndpoint
=
repEndpoint
.
str
();
// create a socket REP
m_requester
.
reset
(
new
zmq
::
socket_t
(
m_application
->
m_impl
->
m_context
,
ZMQ_REP
));
// Create the request socket.
m_requestSocket
=
m_application
->
createRequestSocket
(
m_responderEndpoint
);
// Create a socket REP.
m_repSocket
.
reset
(
new
zmq
::
socket_t
(
m_application
->
m_impl
->
m_context
,
ZMQ_REP
));
stringstream
reqEndpoint
;
reqEndpoint
<<
"tcp://*:"
<<
m_requesterPort
;
m_re
quester
->
bind
(
reqEndpoint
.
str
().
c_str
());
m_re
pSocket
->
bind
(
reqEndpoint
.
str
().
c_str
());
}
RequesterImpl
::~
RequesterImpl
()
{
...
...
@@ -74,8 +78,8 @@ WaitingImpl * RequesterImpl::waiting() {
void
RequesterImpl
::
sendBinary
(
const
std
::
string
&
request
)
{
string
strR
equestType
=
m_application
->
m_impl
->
createRequestType
(
PROTO_REQUEST
);
string
strR
equestData
;
string
r
equestType
Part
=
m_application
->
m_impl
->
createRequestType
(
PROTO_REQUEST
);
string
r
equestData
Part
;
proto
::
Request
requestCommand
;
requestCommand
.
set_applicationname
(
m_application
->
getName
());
...
...
@@ -84,9 +88,9 @@ void RequesterImpl::sendBinary(const std::string& request) {
requestCommand
.
set_serverurl
(
m_application
->
getUrl
());
requestCommand
.
set_serverport
(
m_application
->
getPort
());
requestCommand
.
set_requesterport
(
m_requesterPort
);
requestCommand
.
SerializeToString
(
&
strR
equestData
);
requestCommand
.
SerializeToString
(
&
r
equestData
Part
);
unique_ptr
<
zmq
::
message_t
>
reply
=
m_
application
->
m_impl
->
tryRequestWithOnePartReply
(
strR
equestType
,
strR
equestData
,
m_responderEndpoin
t
);
unique_ptr
<
zmq
::
message_t
>
reply
=
m_
requestSocket
->
request
(
r
equestType
Part
,
r
equestData
Par
t
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
...
...
@@ -102,8 +106,8 @@ void RequesterImpl::send(const std::string& request) {
void
RequesterImpl
::
sendTwoBinaryParts
(
const
std
::
string
&
request1
,
const
std
::
string
&
request2
)
{
string
strR
equestType
=
m_application
->
m_impl
->
createRequestType
(
PROTO_REQUEST
);
string
strR
equestData
;
string
r
equestType
Part
=
m_application
->
m_impl
->
createRequestType
(
PROTO_REQUEST
);
string
r
equestData
Part
;
proto
::
Request
requestCommand
;
requestCommand
.
set_applicationname
(
m_application
->
getName
());
...
...
@@ -113,9 +117,9 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& request1, const std::s
requestCommand
.
set_serverurl
(
m_application
->
getUrl
());
requestCommand
.
set_serverport
(
m_application
->
getPort
());
requestCommand
.
set_requesterport
(
m_requesterPort
);
requestCommand
.
SerializeToString
(
&
strR
equestData
);
requestCommand
.
SerializeToString
(
&
r
equestData
Part
);
unique_ptr
<
zmq
::
message_t
>
reply
=
m_
application
->
m_impl
->
tryRequestWithOnePartReply
(
strR
equestType
,
strR
equestData
,
m_responderEndpoin
t
);
unique_ptr
<
zmq
::
message_t
>
reply
=
m_
requestSocket
->
request
(
r
equestType
Part
,
r
equestData
Par
t
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
...
...
@@ -124,7 +128,7 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& request1, const std::s
bool
RequesterImpl
::
receiveBinary
(
std
::
string
&
response
)
{
unique_ptr
<
zmq
::
message_t
>
message
(
new
zmq
::
message_t
);
m_re
quester
->
recv
(
message
.
get
(),
0
);
m_re
pSocket
->
recv
(
message
.
get
(),
0
);
// multi-part message, first part is the type
proto
::
MessageType
messageType
;
...
...
@@ -132,7 +136,7 @@ bool RequesterImpl::receiveBinary(std::string& response) {
if
(
message
->
more
())
{
message
.
reset
(
new
zmq
::
message_t
);
m_re
quester
->
recv
(
message
.
get
(),
0
);
m_re
pSocket
->
recv
(
message
.
get
(),
0
);
}
else
{
cerr
<<
"unexpected number of frames, should be 2"
<<
endl
;
...
...
@@ -152,7 +156,7 @@ bool RequesterImpl::receiveBinary(std::string& response) {
unique_ptr
<
zmq
::
message_t
>
reply
(
new
zmq
::
message_t
(
size
));
memcpy
((
void
*
)
reply
->
data
(),
data
.
c_str
(),
size
);
m_re
quester
->
send
(
*
reply
);
m_re
pSocket
->
send
(
*
reply
);
return
!
m_canceled
;
}
...
...
@@ -172,10 +176,9 @@ void RequesterImpl::cancel() {
stringstream
requesterEndpoint
;
requesterEndpoint
<<
m_application
->
getUrl
()
<<
":"
<<
m_requesterPort
;
string
strRequestType
=
m_application
->
m_impl
->
createRequestType
(
PROTO_CANCEL
);
string
strRequestData
=
"cancel"
;
unique_ptr
<
zmq
::
message_t
>
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
requesterEndpoint
.
str
());
// Create a request socket only for the request.
unique_ptr
<
RequestSocketImpl
>
requestSocket
=
m_application
->
createRequestSocket
(
requesterEndpoint
.
str
());
unique_ptr
<
zmq
::
message_t
>
reply
=
requestSocket
->
request
(
m_application
->
m_impl
->
createRequestType
(
PROTO_CANCEL
),
"cancel"
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
...
...
@@ -183,14 +186,16 @@ void RequesterImpl::cancel() {
void
RequesterImpl
::
terminate
()
{
if
(
m_re
quester
.
get
()
!=
nullptr
)
{
m_re
quester
.
reset
(
nullptr
);
if
(
m_re
pSocket
.
get
()
!=
nullptr
)
{
m_re
pSocket
.
reset
(
nullptr
);
bool
success
=
m_application
->
removePort
(
getRequesterPortName
(
m_name
,
m_responderId
,
m_requesterId
));
if
(
!
success
)
{
cerr
<<
"server cannot destroy requester "
<<
m_name
<<
endl
;
}
}
m_requestSocket
.
reset
();
}
}
src/cameo/impl/RequesterImpl.h
View file @
f0e93273
...
...
@@ -30,10 +30,12 @@ namespace application {
class
This
;
}
class
RequestSocketImpl
;
class
RequesterImpl
{
public:
RequesterImpl
(
const
application
::
This
*
application
,
const
std
::
string
&
url
,
int
requesterPort
,
int
responderPort
,
const
std
::
string
&
name
,
int
responderId
,
int
requesterId
);
RequesterImpl
(
application
::
This
*
application
,
const
std
::
string
&
url
,
int
requesterPort
,
int
responderPort
,
const
std
::
string
&
name
,
int
responderId
,
int
requesterId
);
~
RequesterImpl
();
static
int
newRequesterId
();
...
...
@@ -51,13 +53,14 @@ public:
void
cancel
();
void
terminate
();
const
application
::
This
*
m_application
;
application
::
This
*
m_application
;
int
m_requesterPort
;
std
::
string
m_responderEndpoint
;
std
::
string
m_name
;
int
m_responderId
;
int
m_requesterId
;
std
::
unique_ptr
<
zmq
::
socket_t
>
m_requester
;
std
::
string
m_responderEndpoint
;
std
::
unique_ptr
<
RequestSocketImpl
>
m_requestSocket
;
std
::
unique_ptr
<
zmq
::
socket_t
>
m_repSocket
;
bool
m_canceled
;
static
const
std
::
string
REQUESTER_PREFIX
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment