Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Shervin Nourbakhsh
cameo
Commits
772c42ec
Commit
772c42ec
authored
Mar 26, 2020
by
legoc
Browse files
Implementation of C++ responder timeout
parent
b580fcc8
Changes
9
Hide whitespace changes
Inline
Side-by-side
src/cameo/Application.cpp
View file @
772c42ec
...
...
@@ -968,6 +968,10 @@ Request::Request(std::unique_ptr<RequestImpl> & impl) :
Request
::~
Request
()
{
}
void
Request
::
setTimeout
(
int
value
)
{
m_impl
->
setTimeout
(
value
);
}
const
std
::
string
&
Request
::
getBinary
()
const
{
return
m_impl
->
m_message
;
}
...
...
src/cameo/Application.h
View file @
772c42ec
...
...
@@ -396,6 +396,8 @@ public:
std
::
string
get
()
const
;
const
std
::
string
&
getSecondBinaryPart
()
const
;
void
setTimeout
(
int
value
);
void
replyBinary
(
const
std
::
string
&
response
);
void
reply
(
const
std
::
string
&
response
);
...
...
src/cameo/Services.cpp
View file @
772c42ec
...
...
@@ -171,4 +171,8 @@ std::unique_ptr<RequestSocketImpl> Services::createRequestSocket(const std::stri
return
unique_ptr
<
RequestSocketImpl
>
(
new
RequestSocketImpl
(
m_impl
->
createRequestSocket
(
endpoint
),
m_impl
->
m_timeout
));
}
std
::
unique_ptr
<
RequestSocketImpl
>
Services
::
createRequestSocket
(
const
std
::
string
&
endpoint
,
int
timeout
)
{
return
unique_ptr
<
RequestSocketImpl
>
(
new
RequestSocketImpl
(
m_impl
->
createRequestSocket
(
endpoint
),
timeout
));
}
}
src/cameo/Services.h
View file @
772c42ec
...
...
@@ -51,6 +51,7 @@ public:
std
::
unique_ptr
<
EventStreamSocket
>
openEventStream
();
std
::
unique_ptr
<
OutputStreamSocket
>
createOutputStreamSocket
(
int
port
);
std
::
unique_ptr
<
RequestSocketImpl
>
createRequestSocket
(
const
std
::
string
&
endpoint
);
std
::
unique_ptr
<
RequestSocketImpl
>
createRequestSocket
(
const
std
::
string
&
endpoint
,
int
timeout
);
std
::
string
m_serverEndpoint
;
std
::
string
m_url
;
...
...
src/cameo/impl/RequestImpl.cpp
View file @
772c42ec
...
...
@@ -30,7 +30,8 @@ RequestImpl::RequestImpl(application::This * application, const std::string & re
m_application
(
application
),
m_message
(
message
),
m_requesterApplicationName
(
requesterApplicationName
),
m_requesterApplicationId
(
requesterApplicationId
)
{
m_requesterApplicationId
(
requesterApplicationId
),
m_timeout
(
0
)
{
stringstream
requesterEndpoint
;
requesterEndpoint
<<
serverUrl
<<
":"
<<
requesterPort
;
...
...
@@ -44,11 +45,22 @@ RequestImpl::RequestImpl(application::This * application, const std::string & re
RequestImpl
::~
RequestImpl
()
{
}
void
RequestImpl
::
setTimeout
(
int
value
)
{
m_timeout
=
value
;
}
void
RequestImpl
::
replyBinary
(
const
std
::
string
&
response
)
{
// Create a request socket. It is created for each request that could be optimized.
unique_ptr
<
RequestSocketImpl
>
requestSocket
=
m_application
->
createRequestSocket
(
m_requesterEndpoint
);
requestSocket
->
request
(
m_application
->
m_impl
->
createRequestType
(
PROTO_RESPONSE
),
response
);
unique_ptr
<
RequestSocketImpl
>
requestSocket
=
m_application
->
createRequestSocket
(
m_requesterEndpoint
,
m_timeout
);
//requestSocket->requestAsync(m_application->m_impl->createRequestType(PROTO_RESPONSE), response);
try
{
requestSocket
->
request
(
m_application
->
m_impl
->
createRequestType
(
PROTO_RESPONSE
),
response
);
}
catch
(
const
ConnectionTimeout
&
)
{
cout
<<
"timeout while replying"
<<
endl
;
}
}
void
RequestImpl
::
reply
(
const
std
::
string
&
response
)
{
...
...
src/cameo/impl/RequestImpl.h
View file @
772c42ec
...
...
@@ -33,6 +33,8 @@ public:
RequestImpl
(
application
::
This
*
application
,
const
std
::
string
&
requesterApplicationName
,
int
requesterApplicationId
,
const
std
::
string
&
message
,
const
std
::
string
&
serverUrl
,
int
serverPort
,
int
requesterPort
);
~
RequestImpl
();
void
setTimeout
(
int
value
);
void
replyBinary
(
const
std
::
string
&
response
);
void
reply
(
const
std
::
string
&
response
);
...
...
@@ -43,6 +45,7 @@ public:
std
::
string
m_requesterApplicationName
;
int
m_requesterApplicationId
;
std
::
string
m_requesterServerEndpoint
;
int
m_timeout
;
};
}
...
...
src/cameo/impl/RequestSocketImpl.cpp
View file @
772c42ec
...
...
@@ -18,6 +18,9 @@
#include
"../ConnectionTimeout.h"
#include
<iostream>
#include
<chrono>
#include
<thread>
using
namespace
std
;
namespace
cameo
{
...
...
@@ -59,7 +62,6 @@ std::unique_ptr<zmq::message_t> RequestSocketImpl::request(const std::string& re
int
rc
=
zmq
::
poll
(
items
,
1
,
timeout
);
if
(
rc
==
0
)
{
// Timeout occurred.
m_socket
->
close
();
throw
ConnectionTimeout
();
}
}
...
...
@@ -71,4 +73,24 @@ std::unique_ptr<zmq::message_t> RequestSocketImpl::request(const std::string& re
return
reply
;
}
void
RequestSocketImpl
::
requestAsync
(
const
std
::
string
&
requestTypePart
,
const
std
::
string
&
requestDataPart
)
{
// Prepare the request parts.
int
requestTypeSize
=
requestTypePart
.
length
();
int
requestDataSize
=
requestDataPart
.
length
();
zmq
::
message_t
requestType
(
requestTypeSize
);
zmq
::
message_t
requestData
(
requestDataSize
);
memcpy
(
static_cast
<
void
*>
(
requestType
.
data
()),
requestTypePart
.
c_str
(),
requestTypeSize
);
memcpy
(
static_cast
<
void
*>
(
requestData
.
data
()),
requestDataPart
.
c_str
(),
requestDataSize
);
// Send the request in two parts.
m_socket
->
send
(
requestType
,
ZMQ_SNDMORE
);
m_socket
->
send
(
requestData
);
// ...
// Close the socket as we do not need to wait for the reply.
m_socket
->
close
();
}
}
src/cameo/impl/RequestSocketImpl.h
View file @
772c42ec
...
...
@@ -30,6 +30,7 @@ public:
virtual
~
RequestSocketImpl
();
std
::
unique_ptr
<
zmq
::
message_t
>
request
(
const
std
::
string
&
requestTypePart
,
const
std
::
string
&
requestDataPart
,
int
overrideTimeout
=
-
1
);
void
requestAsync
(
const
std
::
string
&
requestTypePart
,
const
std
::
string
&
requestDataPart
);
std
::
unique_ptr
<
zmq
::
socket_t
>
m_socket
;
int
m_timeout
;
...
...
src/cameo/impl/ServicesImpl.cpp
View file @
772c42ec
...
...
@@ -257,8 +257,8 @@ zmq::socket_t * ServicesImpl::createRequestSocket(const std::string& endpoint) {
try
{
// Set the linger value to 0 to ensure that pending requests are destroyed in case of timeout.
int
value
=
0
;
socket
->
setsockopt
(
ZMQ_LINGER
,
&
value
,
sizeof
(
int
));
//
int value = 0;
//
socket->setsockopt(ZMQ_LINGER, &value, sizeof(int));
// Connect to the endpoint.
socket
->
connect
(
endpoint
.
c_str
());
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment