...
 
Commits (2)
* Use timeout in all the responders (cameo api cpp 0.3.* required)
* Changed call to now() in SessionResponder by call to getActualState().
* First multi-client implementation.
* Removed ScannablePropertiesLoader.
......
......@@ -19,16 +19,20 @@
#include "ConditionManagerResponder.h"
#include <common/protobuf/generated/CommonResponses.pb.h>
#include <common/protobuf/generated/ConditionManagerRequests.pb.h>
#include <common/base/ServerProperties.h>
#include <common/output/OutputStream.h>
#include <core/conditions/ConditionManager.h>
#include <core/session/SessionManager.h>
using namespace std;
using namespace cameo;
using namespace common;
namespace core {
ConditionManagerResponder::ConditionManagerResponder() {
const std::string ConditionManagerResponder::REQUEST_REPLY_TIMEOUT = "requestReplyTimeout";
ConditionManagerResponder::ConditionManagerResponder() : m_replyTimeout(0) {
}
ConditionManagerResponder::~ConditionManagerResponder() {
......@@ -42,6 +46,14 @@ void ConditionManagerResponder::init() {
// Start the thread.
m_thread.reset(new boost::thread(boost::bind(&ConditionManagerResponder::loop, this)));
// Set the request reply timeout.
try {
m_replyTimeout = ServerProperties::getInstance()->get<int>(REQUEST_REPLY_TIMEOUT);
} catch (const Exception& e) {
m_replyTimeout = 1000;
}
cout << "Initialised ConditionManagerResponder" << endl;
}
......@@ -378,6 +390,9 @@ void ConditionManagerResponder::loop() {
break;
}
// Set the timeout for not blocking when replying.
request->setTimeout(m_replyTimeout);
// Get the request type.
string firstPart = request->getBinary();
......@@ -399,7 +414,9 @@ void ConditionManagerResponder::loop() {
cerr << "Condition manager responder cannot process " << messageType.type() << endl;
}
request->replyBinary(response);
if (!request->replyBinary(response)) {
slog << "Timeout occurred after " << m_replyTimeout << "ms with " << request->getObjectId() << eol;
}
// Notify the clients in case the function modifies the command zone.
if (!processFunction.readonly) {
......
......@@ -86,6 +86,9 @@ private:
};
std::map<int, ProcessFunction> m_processFunctions;
int m_replyTimeout;
static const std::string REQUEST_REPLY_TIMEOUT;
};
}
......
......@@ -22,6 +22,7 @@
#include <dataprovider/database/Database.h>
#include <common/protobuf/generated/CommonResponses.pb.h>
#include <common/protobuf/generated/DatabaseRequests.pb.h>
#include <common/base/ServerProperties.h>
#include <common/output/OutputStream.h>
#include <core/scheduler/Sender.h>
#include <core/session/SessionManager.h>
......@@ -32,12 +33,15 @@
using namespace std;
using namespace boost;
using namespace cameo;
using namespace common;
using namespace dataprovider;
namespace core {
const std::string DatabaseResponder::REQUEST_REPLY_TIMEOUT = "requestReplyTimeout";
DatabaseResponder::DatabaseResponder(dataprovider::PropertyAccessorDirectImpl* propertyAccessor, dataprovider::CommandAccessorDirectImpl* commandAccessor) :
m_propertyAccessor(propertyAccessor), m_commandAccessor(commandAccessor) {
m_propertyAccessor(propertyAccessor), m_commandAccessor(commandAccessor), m_replyTimeout(0) {
}
DatabaseResponder::~DatabaseResponder() {
......@@ -51,6 +55,14 @@ void DatabaseResponder::init() {
// Start the thread.
m_thread.reset(new boost::thread(boost::bind(&DatabaseResponder::loop, this)));
// Set the request reply timeout.
try {
m_replyTimeout = ServerProperties::getInstance()->get<int>(REQUEST_REPLY_TIMEOUT);
} catch (const Exception& e) {
m_replyTimeout = 1000;
}
cout << "Initialised DatabaseResponder" << endl;
}
......@@ -629,6 +641,9 @@ void DatabaseResponder::loop() {
break;
}
// Set the timeout for not blocking when replying.
request->setTimeout(m_replyTimeout);
// Get the request type.
string firstPart = request->getBinary();
......@@ -650,7 +665,9 @@ void DatabaseResponder::loop() {
cerr << "Database manager responder cannot process " << messageType.type() << endl;
}
request->replyBinary(response);
if (!request->replyBinary(response)) {
slog << "Timeout occurred after " << m_replyTimeout << "ms with " << request->getObjectId() << eol;
}
// Notify the clients in case the function modifies the command zone.
if (!processFunction.readonly) {
......
......@@ -116,6 +116,9 @@ private:
};
std::map<int, ProcessFunction> m_processFunctions;
int m_replyTimeout;
static const std::string REQUEST_REPLY_TIMEOUT;
};
}
......
......@@ -21,6 +21,7 @@
#include <dataprovider/iconandfamily/ServantIconAndFamilyMaps.h>
#include <common/protobuf/generated/CommonResponses.pb.h>
#include <common/protobuf/generated/ServantManagerRequests.pb.h>
#include <common/base/ServerProperties.h>
#include <common/output/OutputStream.h>
#include <core/cloner/ServantCloner.h>
#include <core/dataaccess/DataAccess.h>
......@@ -37,12 +38,16 @@
using namespace std;
using namespace boost;
using namespace cameo;
using namespace common;
using namespace dataprovider;
namespace core {
const std::string ServantManagerResponder::REQUEST_REPLY_TIMEOUT = "requestReplyTimeout";
ServantManagerResponder::ServantManagerResponder(dataprovider::ServantAccessorDirectImpl* servantAccessor) :
m_servantAccessor(servantAccessor) {
m_servantAccessor(servantAccessor),
m_replyTimeout(0) {
}
ServantManagerResponder::~ServantManagerResponder() {
......@@ -56,6 +61,14 @@ void ServantManagerResponder::init() {
// Start the thread.
m_thread.reset(new boost::thread(boost::bind(&ServantManagerResponder::loop, this)));
// Set the request reply timeout.
try {
m_replyTimeout = ServerProperties::getInstance()->get<int>(REQUEST_REPLY_TIMEOUT);
} catch (const Exception& e) {
m_replyTimeout = 1000;
}
cout << "Initialised ServantManagerResponder" << endl;
}
......@@ -559,6 +572,9 @@ void ServantManagerResponder::loop() {
break;
}
// Set the timeout for not blocking when replying.
request->setTimeout(m_replyTimeout);
// Get the request type.
string firstPart = request->getBinary();
......@@ -580,7 +596,9 @@ void ServantManagerResponder::loop() {
cerr << "Servant manager responder cannot process " << messageType.type() << endl;
}
request->replyBinary(response);
if (!request->replyBinary(response)) {
slog << "Timeout occurred after " << m_replyTimeout << "ms with " << request->getObjectId() << eol;
}
// Notify the clients in case the function modifies the command zone.
if (!processFunction.readonly) {
......
......@@ -118,6 +118,9 @@ private:
};
std::map<int, ProcessFunction> m_processFunctions;
int m_replyTimeout;
static const std::string REQUEST_REPLY_TIMEOUT;
};
}
......
......@@ -19,6 +19,7 @@
#include "SessionResponder.h"
#include <common/protobuf/generated/CommonResponses.pb.h>
#include <common/protobuf/generated/SessionRequests.pb.h>
#include <common/base/ServerProperties.h>
#include <common/output/OutputStream.h>
using namespace std;
......@@ -27,7 +28,9 @@ using namespace common;
namespace core {
SessionResponder::SessionResponder() {
const std::string SessionResponder::REQUEST_REPLY_TIMEOUT = "requestReplyTimeout";
SessionResponder::SessionResponder() : m_replyTimeout(0) {
}
SessionResponder::~SessionResponder() {
......@@ -41,6 +44,14 @@ void SessionResponder::init() {
// Start the thread.
m_thread.reset(new std::thread(std::bind(&SessionResponder::loop, this)));
// Set the request reply timeout.
try {
m_replyTimeout = ServerProperties::getInstance()->get<int>(REQUEST_REPLY_TIMEOUT);
} catch (const Exception& e) {
m_replyTimeout = 1000;
}
cout << "Initialised SessionResponder" << endl;
}
......@@ -233,6 +244,9 @@ void SessionResponder::loop() {
break;
}
// Set the timeout for not blocking when replying.
request->setTimeout(m_replyTimeout);
// Get the request type.
string firstPart = request->getBinary();
......@@ -254,7 +268,9 @@ void SessionResponder::loop() {
cerr << "Session manager responder cannot process " << messageType.type() << endl;
}
request->replyBinary(response);
if (!request->replyBinary(response)) {
slog << "Timeout occurred after " << m_replyTimeout << "ms with " << request->getObjectId() << eol;
}
}
}
......
......@@ -70,6 +70,9 @@ private:
std::unique_ptr<std::thread> m_thread;
std::unique_ptr<cameo::application::Responder> m_responder;
std::map<int, ProcessFunctionType> m_processFunctions;
int m_replyTimeout;
static const std::string REQUEST_REPLY_TIMEOUT;
};
}
......
......@@ -19,6 +19,7 @@
#include "UserVariableManagerResponder.h"
#include <common/protobuf/generated/CommonResponses.pb.h>
#include <common/protobuf/generated/VariableManagerRequests.pb.h>
#include <common/base/ServerProperties.h>
#include <common/output/OutputStream.h>
#include <core/commandzone/CommandZone.h>
#include <core/variables/UserVariableManager.h>
......@@ -27,10 +28,13 @@
using namespace std;
using namespace cameo;
using namespace common;
namespace core {
UserVariableManagerResponder::UserVariableManagerResponder() {
const std::string UserVariableManagerResponder::REQUEST_REPLY_TIMEOUT = "requestReplyTimeout";
UserVariableManagerResponder::UserVariableManagerResponder() : m_replyTimeout(0) {
}
UserVariableManagerResponder::~UserVariableManagerResponder() {
......@@ -43,6 +47,13 @@ void UserVariableManagerResponder::init() {
// Start the thread.
m_thread.reset(new boost::thread(boost::bind(&UserVariableManagerResponder::loop, this)));
// Set the request reply timeout.
try {
m_replyTimeout = ServerProperties::getInstance()->get<int>(REQUEST_REPLY_TIMEOUT);
} catch (const Exception& e) {
m_replyTimeout = 1000;
}
cout << "Initialised VariableManagerResponder" << endl;
}
......@@ -234,6 +245,9 @@ void UserVariableManagerResponder::loop() {
break;
}
// Set the timeout for not blocking when replying.
request->setTimeout(m_replyTimeout);
// Get the request type.
string firstPart = request->getBinary();
......@@ -255,7 +269,9 @@ void UserVariableManagerResponder::loop() {
cerr << "User variable manager responder cannot process " << messageType.type() << endl;
}
request->replyBinary(response);
if (!request->replyBinary(response)) {
slog << "Timeout occurred after " << m_replyTimeout << "ms with " << request->getObjectId() << eol;
}
// Notify the clients in case the function modifies the command zone.
if (!processFunction.readonly) {
......
......@@ -79,6 +79,9 @@ private:
};
std::map<int, ProcessFunction> m_processFunctions;
int m_replyTimeout;
static const std::string REQUEST_REPLY_TIMEOUT;
};
}
......
......@@ -19,6 +19,8 @@
#include "CommandLineResponder.h"
#include <common/protobuf/generated/CommonResponses.pb.h>
#include <common/protobuf/generated/CommandLineRequests.pb.h>
#include <common/base/ServerProperties.h>
#include <common/output/OutputStream.h>
#include <ics/CommandSystem/CommandLine/CommandLineManager.h>
#include <core/session/SessionManager.h>
#include <string>
......@@ -26,10 +28,13 @@
using namespace std;
using namespace boost;
using namespace cameo;
using namespace common;
namespace core {
CommandLineResponder::CommandLineResponder() {
const std::string CommandLineResponder::REQUEST_REPLY_TIMEOUT = "requestReplyTimeout";
CommandLineResponder::CommandLineResponder() : m_replyTimeout(0) {
}
CommandLineResponder::~CommandLineResponder() {
......@@ -43,6 +48,14 @@ void CommandLineResponder::init() {
// Start the thread.
m_thread.reset(new boost::thread(boost::bind(&CommandLineResponder::loop, this)));
// Set the request reply timeout.
try {
m_replyTimeout = ServerProperties::getInstance()->get<int>(REQUEST_REPLY_TIMEOUT);
} catch (const Exception& e) {
m_replyTimeout = 1000;
}
cout << "Initialised CommandLineResponder" << endl;
}
......@@ -174,6 +187,9 @@ void CommandLineResponder::loop() {
break;
}
// Set the timeout for not blocking when replying.
request->setTimeout(m_replyTimeout);
// Get the request type.
string firstPart = request->getBinary();
......@@ -195,7 +211,9 @@ void CommandLineResponder::loop() {
cerr << "Command line responder cannot process " << messageType.type() << endl;
}
request->replyBinary(response);
if (!request->replyBinary(response)) {
slog << "Timeout occurred after " << m_replyTimeout << "ms with " << request->getObjectId() << eol;
}
// Notify the clients in case the function modifies the command zone.
if (!processFunction.readonly) {
......
......@@ -83,6 +83,9 @@ private:
};
std::map<int, ProcessFunction> m_processFunctions;
int m_replyTimeout;
static const std::string REQUEST_REPLY_TIMEOUT;
};
}
......
......@@ -19,6 +19,7 @@
#include "CommandZoneResponder.h"
#include <common/protobuf/generated/CommonResponses.pb.h>
#include <common/protobuf/generated/CommandZoneRequests.pb.h>
#include <common/base/ServerProperties.h>
#include <common/output/OutputStream.h>
#include <core/commandzone/CommandBox.h>
#include <core/commandzone/AtomicCommandBox.h>
......@@ -43,7 +44,9 @@ using namespace common;
namespace core {
CommandZoneResponder::CommandZoneResponder() {
const std::string CommandZoneResponder::REQUEST_REPLY_TIMEOUT = "requestReplyTimeout";
CommandZoneResponder::CommandZoneResponder() : m_replyTimeout(0) {
}
CommandZoneResponder::~CommandZoneResponder() {
......@@ -57,6 +60,14 @@ void CommandZoneResponder::init() {
// Start the thread.
m_thread.reset(new boost::thread(boost::bind(&CommandZoneResponder::loop, this)));
// Set the request reply timeout.
try {
m_replyTimeout = ServerProperties::getInstance()->get<int>(REQUEST_REPLY_TIMEOUT);
} catch (const Exception& e) {
m_replyTimeout = 1000;
}
cout << "Initialised CommandZoneResponder" << endl;
}
......@@ -3217,6 +3228,9 @@ void CommandZoneResponder::loop() {
break;
}
// Set the timeout for not blocking when replying.
request->setTimeout(m_replyTimeout);
// Get the request type.
string firstPart = request->getBinary();
......@@ -3239,7 +3253,9 @@ void CommandZoneResponder::loop() {
cerr << "Command zone responder cannot process " << messageType.type() << endl;
}
request->replyBinary(response);
if (!request->replyBinary(response)) {
slog << "Timeout occurred after " << m_replyTimeout << "ms with " << request->getObjectId() << eol;
}
// Notify the clients in case the function modifies the command zone.
if (!processFunction.readonly) {
......
......@@ -199,6 +199,9 @@ private:
};
std::map<int, ProcessFunction> m_processFunctions;
int m_replyTimeout;
static const std::string REQUEST_REPLY_TIMEOUT;
};
}
......