Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Cameo
cameo
Commits
b4b4f3cd
Commit
b4b4f3cd
authored
Mar 21, 2018
by
legoc
Browse files
Corrected some memory leaks by changing the return type of tryRequestWithOnePartReply
parent
20031579
Changes
10
Hide whitespace changes
Inline
Side-by-side
ChangeLog
View file @
b4b4f3cd
...
...
@@ -3,6 +3,7 @@
* Enabled to define more than one requester on the same responder in one Application instance.
* Removed zmq.hpp as it should be installed.
* Corrected some memory leaks by changing the return type of tryRequestWithOnePartReply.
0.1.0
-----
...
...
configure.ac
View file @
b4b4f3cd
...
...
@@ -4,7 +4,7 @@
#
# -----------------------------------------------------------------------------
AC_INIT(cameo-api-cpp, 0.1.2
-dev
)
AC_INIT(cameo-api-cpp, 0.1.2)
LIBRARY_VERSION=0:1:2
AC_CONFIG_AUX_DIR(config)
...
...
src/cameo/Application.cpp
View file @
b4b4f3cd
...
...
@@ -261,11 +261,10 @@ int This::initUnmanagedApplication() {
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_STARTEDUNMANAGED
);
string
strRequestData
=
m_impl
->
createStartedUnmanagedRequest
(
m_name
);
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
return
requestResponse
.
value
();
}
...
...
@@ -274,22 +273,20 @@ void This::terminateUnmanagedApplication() {
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_TERMINATEDUNMANAGED
);
string
strRequestData
=
m_impl
->
createTerminatedUnmanagedRequest
(
m_id
);
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
}
bool
This
::
setRunning
()
{
string
strRequestType
=
m_instance
.
m_impl
->
createRequest
(
PROTO_SETSTATUS
);
string
strRequestData
=
m_instance
.
m_impl
->
createSetStatusRequest
(
m_instance
.
m_id
,
RUNNING
);
zmq
::
message_t
*
reply
=
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_instance
.
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_instance
.
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
if
(
requestResponse
.
value
()
==
-
1
)
{
return
false
;
...
...
@@ -303,10 +300,9 @@ void This::setBinaryResult(const std::string& data) {
string
strRequestType
=
m_instance
.
m_impl
->
createRequest
(
PROTO_SETRESULT
);
string
strRequestData
=
m_instance
.
m_impl
->
createSetResultRequest
(
m_instance
.
m_id
,
data
);
zmq
::
message_t
*
reply
=
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_instance
.
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_instance
.
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
if
(
requestResponse
.
value
()
==
-
1
)
{
//throw ?;
...
...
@@ -353,11 +349,10 @@ State This::getState(int id) const {
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_GETSTATUS
);
string
strRequestData
=
m_impl
->
createGetStatusRequest
(
id
);
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
StatusEvent
protoStatus
;
protoStatus
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
return
protoStatus
.
applicationstate
();
}
...
...
@@ -366,11 +361,10 @@ bool This::destroyPublisher(const std::string& name) const {
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_TERMINATEPUBLISHER
);
string
strRequestData
=
m_impl
->
createTerminatePublisherRequest
(
m_id
,
name
);
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
int
value
=
requestResponse
.
value
();
return
(
value
!=
-
1
);
...
...
@@ -380,11 +374,10 @@ bool This::removePort(const std::string& name) const {
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_REMOVEPORT
);
string
strRequestData
=
m_impl
->
createRemovePortRequest
(
m_id
,
name
);
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
int
value
=
requestResponse
.
value
();
return
(
value
!=
-
1
);
...
...
@@ -760,10 +753,9 @@ std::auto_ptr<Publisher> Publisher::create(const std::string& name, int numberOf
string
strRequestType
=
This
::
m_instance
.
m_impl
->
createRequest
(
PROTO_CREATEPUBLISHER
);
string
strRequestData
=
This
::
m_instance
.
m_impl
->
createCreatePublisherRequest
(
This
::
m_instance
.
m_id
,
name
,
numberOfSubscribers
);
zmq
::
message_t
*
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
This
::
m_instance
.
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
This
::
m_instance
.
m_serverEndpoint
);
proto
::
PublisherResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
int
publisherPort
=
requestResponse
.
publisherport
();
if
(
publisherPort
==
-
1
)
{
...
...
@@ -1011,10 +1003,9 @@ std::auto_ptr<Responder> Responder::create(const std::string& name) {
string
strRequestType
=
This
::
m_instance
.
m_impl
->
createRequest
(
PROTO_REQUESTPORT
);
string
strRequestData
=
This
::
m_instance
.
m_impl
->
createRequestPortRequest
(
This
::
m_instance
.
m_id
,
portName
);
zmq
::
message_t
*
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
This
::
m_instance
.
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
This
::
m_instance
.
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
int
responderPort
=
requestResponse
.
value
();
if
(
responderPort
==
-
1
)
{
...
...
@@ -1071,10 +1062,10 @@ std::auto_ptr<Requester> Requester::create(Instance & instance, const std::strin
string
strRequestType
=
This
::
m_instance
.
m_impl
->
createRequest
(
PROTO_CONNECTPORT
);
string
strRequestData
=
This
::
m_instance
.
m_impl
->
createConnectPortRequest
(
responderId
,
responderPortName
);
zmq
::
message_t
*
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
responderEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
responderEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
reply
.
reset
()
;
int
responderPort
=
requestResponse
.
value
();
if
(
responderPort
==
-
1
)
{
...
...
@@ -1084,12 +1075,13 @@ std::auto_ptr<Requester> Requester::create(Instance & instance, const std::strin
// Retry to connect.
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
responderEndpoint
);
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
responderPort
=
requestResponse
.
value
();
if
(
responderPort
==
-
1
)
{
throw
RequesterCreationException
(
requestResponse
.
message
());
}
reply
.
reset
();
}
// Request a requester port
...
...
@@ -1098,7 +1090,6 @@ std::auto_ptr<Requester> Requester::create(Instance & instance, const std::strin
reply
=
This
::
m_instance
.
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
This
::
m_instance
.
m_serverEndpoint
);
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
int
requesterPort
=
requestResponse
.
value
();
if
(
requesterPort
==
-
1
)
{
...
...
src/cameo/Server.cpp
View file @
b4b4f3cd
...
...
@@ -106,11 +106,10 @@ std::auto_ptr<application::Instance> Server::start(const std::string& name, cons
string
strRequestData
=
m_impl
->
createStartRequest
(
name
,
args
,
application
::
This
::
getReference
());
try
{
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
if
(
requestResponse
.
value
()
==
-
1
)
{
instance
->
setErrorMessage
(
requestResponse
.
message
());
...
...
@@ -138,11 +137,10 @@ Response Server::stopApplicationAsynchronously(int id, bool immediately) const {
strRequestData
=
m_impl
->
createStopRequest
(
id
);
}
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
return
Response
(
requestResponse
.
value
(),
requestResponse
.
message
());
}
...
...
@@ -176,11 +174,10 @@ application::InstanceArray Server::connectAll(const std::string& name) {
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_CONNECT
);
string
strRequestData
=
m_impl
->
createConnectRequest
(
name
);
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
ApplicationInfoListResponse
response
;
response
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
// allocate the array
instances
.
allocate
(
response
.
applicationinfo_size
());
...
...
@@ -249,11 +246,10 @@ bool Server::isAlive(int id) const {
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_ISALIVE
);
string
strRequestData
=
m_impl
->
createIsAliveRequest
(
id
);
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
IsAliveResponse
isAliveResponse
;
isAliveResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
return
isAliveResponse
.
isalive
();
}
...
...
@@ -264,11 +260,10 @@ std::vector<application::Configuration> Server::getApplicationConfigurations() c
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_ALLAVAILABLE
);
string
strRequestData
=
m_impl
->
createAllAvailableRequest
();
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
AllAvailableResponse
allAvailableResponse
;
allAvailableResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
for
(
int
i
=
0
;
i
<
allAvailableResponse
.
applicationconfig_size
();
++
i
)
{
proto
::
ApplicationConfig
config
=
allAvailableResponse
.
applicationconfig
(
i
);
...
...
@@ -293,11 +288,10 @@ std::vector<application::Info> Server::getApplicationInfos() const {
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_SHOWALL
);
string
strRequestData
=
m_impl
->
createShowAllRequest
();
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
ApplicationInfoListResponse
response
;
response
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
for
(
int
i
=
0
;
i
<
response
.
applicationinfo_size
();
++
i
)
{
proto
::
ApplicationInfo
info
=
response
.
applicationinfo
(
i
);
...
...
@@ -338,10 +332,9 @@ std::auto_ptr<application::Subscriber> Server::createSubscriber(int id, const st
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_CONNECTPUBLISHER
);
string
strRequestData
=
m_impl
->
createConnectPublisherRequest
(
id
,
publisherName
);
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
PublisherResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
int
publisherPort
=
requestResponse
.
publisherport
();
if
(
publisherPort
==
-
1
)
{
...
...
src/cameo/Services.cpp
View file @
b4b4f3cd
...
...
@@ -99,11 +99,10 @@ void Services::initStatus() {
// get the status port
string
strRequestType
=
m_impl
->
createRequest
(
PROTO_STATUS
);
string
strRequestData
=
m_impl
->
createShowStatusRequest
();
zmq
::
message_t
*
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_serverEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
// reply ok
if
(
requestResponse
.
value
()
==
-
1
)
{
...
...
src/cameo/impl/PublisherImpl.cpp
View file @
b4b4f3cd
...
...
@@ -147,11 +147,10 @@ void PublisherImpl::cancelWaitForSubscribers() {
proto
::
CancelPublisherSyncCommand
cancelPublisherSyncCommand
;
cancelPublisherSyncCommand
.
SerializeToString
(
&
strRequestData
);
zmq
::
message_t
*
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
endpoint
.
str
());
auto_ptr
<
zmq
::
message_t
>
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
endpoint
.
str
());
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
}
WaitingImpl
*
PublisherImpl
::
waiting
()
{
...
...
src/cameo/impl/RequesterImpl.cpp
View file @
b4b4f3cd
...
...
@@ -88,11 +88,10 @@ void RequesterImpl::sendBinary(const std::string& request) {
requestCommand
.
set_requesterport
(
m_requesterPort
);
requestCommand
.
SerializeToString
(
&
strRequestData
);
zmq
::
message_t
*
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_responderEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_responderEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
}
void
RequesterImpl
::
send
(
const
std
::
string
&
request
)
{
...
...
@@ -118,11 +117,10 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& request1, const std::s
requestCommand
.
set_requesterport
(
m_requesterPort
);
requestCommand
.
SerializeToString
(
&
strRequestData
);
zmq
::
message_t
*
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_responderEndpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
m_responderEndpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
}
bool
RequesterImpl
::
receiveBinary
(
std
::
string
&
response
)
{
...
...
@@ -184,11 +182,10 @@ void RequesterImpl::cancel() {
string
strRequestType
=
m_application
->
m_impl
->
createRequest
(
PROTO_CANCEL
);
string
strRequestData
=
"cancel"
;
zmq
::
message_t
*
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
requesterEndpoint
.
str
());
auto_ptr
<
zmq
::
message_t
>
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
requesterEndpoint
.
str
());
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
}
void
RequesterImpl
::
terminate
()
{
...
...
src/cameo/impl/ResponderImpl.cpp
View file @
b4b4f3cd
...
...
@@ -56,11 +56,10 @@ void ResponderImpl::cancel() {
string
strRequestType
=
m_application
->
m_impl
->
createRequest
(
PROTO_CANCEL
);
string
strRequestData
=
"cancel"
;
zmq
::
message_t
*
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
endpoint
.
str
());
auto_ptr
<
zmq
::
message_t
>
reply
=
m_application
->
m_impl
->
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
endpoint
.
str
());
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
}
WaitingImpl
*
ResponderImpl
::
waiting
()
{
...
...
src/cameo/impl/ServicesImpl.cpp
View file @
b4b4f3cd
...
...
@@ -82,7 +82,7 @@ std::string ServicesImpl::createStartRequest(const std::string& name, const std:
return
strRequestStart
;
}
zmq
::
message_t
*
ServicesImpl
::
tryRequestWithOnePartReply
(
const
std
::
string
&
strRequestType
,
const
std
::
string
&
strRequestData
,
const
std
::
string
&
endpoint
,
int
overrideTimeout
)
{
std
::
auto_ptr
<
zmq
::
message_t
>
ServicesImpl
::
tryRequestWithOnePartReply
(
const
std
::
string
&
strRequestType
,
const
std
::
string
&
strRequestData
,
const
std
::
string
&
endpoint
,
int
overrideTimeout
)
{
zmq
::
socket_t
socket
(
m_context
,
ZMQ_REQ
);
try
{
...
...
@@ -127,8 +127,8 @@ zmq::message_t* ServicesImpl::tryRequestWithOnePartReply(const std::string& strR
}
}
zmq
::
message_t
*
reply
=
new
zmq
::
message_t
;
socket
.
recv
(
reply
,
0
);
auto_ptr
<
zmq
::
message_t
>
reply
(
new
zmq
::
message_t
())
;
socket
.
recv
(
reply
.
get
()
,
0
);
return
reply
;
}
...
...
@@ -331,10 +331,9 @@ std::string ServicesImpl::createTerminatedUnmanagedRequest(int id) const {
bool
ServicesImpl
::
isAvailable
(
const
std
::
string
&
strRequestType
,
const
std
::
string
&
strRequestData
,
const
std
::
string
&
endpoint
,
int
timeout
)
{
try
{
zmq
::
message_t
*
reply
=
0
;
reply
=
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
endpoint
.
c_str
(),
timeout
);
auto_ptr
<
zmq
::
message_t
>
reply
=
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
endpoint
.
c_str
(),
timeout
);
if
(
reply
!=
0
)
{
if
(
reply
.
get
()
!=
0
)
{
return
true
;
}
...
...
@@ -367,12 +366,13 @@ void ServicesImpl::waitForSubscriber(zmq::socket_t * subscriber, const std::stri
}
void
ServicesImpl
::
subscribeToPublisher
(
const
std
::
string
&
endpoint
)
{
string
strRequestType
=
createRequest
(
PROTO_SUBSCRIBEPUBLISHER
);
string
strRequestData
=
createSubscribePublisherRequest
();
zmq
::
message_t
*
reply
=
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
endpoint
);
auto_ptr
<
zmq
::
message_t
>
reply
=
tryRequestWithOnePartReply
(
strRequestType
,
strRequestData
,
endpoint
);
proto
::
RequestResponse
requestResponse
;
requestResponse
.
ParseFromArray
((
*
reply
).
data
(),
(
*
reply
).
size
());
delete
reply
;
}
/**
...
...
src/cameo/impl/ServicesImpl.h
View file @
b4b4f3cd
...
...
@@ -18,9 +18,10 @@
#define CAMEO_SERVICESIMPL_H_
#include
"../../proto/Messages.pb.h"
#include
"../ProtoType.h"
#include
<vector>
#include
<memory>
#include
"zmq.hpp"
#include
"../ProtoType.h"
namespace
cameo
{
...
...
@@ -63,7 +64,7 @@ public:
zmq
::
socket_t
*
createEventSubscriber
(
const
std
::
string
&
endpoint
,
const
std
::
string
&
cancelEndpoint
);
zmq
::
socket_t
*
createCancelPublisher
(
const
std
::
string
&
endpoint
);
zmq
::
message_t
*
tryRequestWithOnePartReply
(
const
std
::
string
&
strRequestType
,
const
std
::
string
&
strRequestData
,
const
std
::
string
&
endpoint
,
int
overrideTimeout
=
-
1
);
std
::
auto_ptr
<
zmq
::
message_t
>
tryRequestWithOnePartReply
(
const
std
::
string
&
strRequestType
,
const
std
::
string
&
strRequestData
,
const
std
::
string
&
endpoint
,
int
overrideTimeout
=
-
1
);
std
::
string
createShowStreamRequest
(
int
id
)
const
;
proto
::
MessageType_Type
convertToProtoType
(
ProtoType
type
)
const
;
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment