Commit 78bfc237 authored by yannick legoc's avatar yannick legoc
Browse files

Adapted Mantid reduction test to new requests and subscriber

parent 5822627d
/RequestMessages.pb.cc
/RequestMessages.pb.h
......@@ -2,4 +2,6 @@
<controller class="mantid::ReductionControllerTest"/>
<include path="$(NOMAD_HOME)/../NomadModules/src"/>
<script exec="protoc --cpp_out=. RequestMessages.proto"/>
</module>
......@@ -18,6 +18,7 @@
#include "ReductionControllerTest.h"
#include "controllers/common/family/Families.h"
#include "RequestMessages.pb.h"
#include <boost/lexical_cast.hpp>
#include <common/base/Date.h>
#include <fstream>
......@@ -32,8 +33,10 @@ using namespace boost;
using namespace cameo;
const std::string ReductionControllerTest::TYPE = "mantid_reduction_test";
const std::string ReductionControllerTest::REMOTE_APPLICATION = "responder-server";
const std::string ReductionControllerTest::REMOTE_APPLICATION = "madmanserver";
const std::string ReductionControllerTest::RESPONDER = "responder";
const std::string ReductionControllerTest::PUBLISHER = "publisher";
const std::string ReductionControllerTest::PROCESSED_FILE = "processed.nxs";
ReductionControllerTest::ReductionControllerTest(const std::string& name) :
ExperimentController(name), controller::Stoppable(this) {
......@@ -55,7 +58,12 @@ ReductionControllerTest::ReductionControllerTest(const ReductionControllerTest&
ReductionControllerTest::~ReductionControllerTest() {
if (m_remoteApplication.get() != NULL) {
if (m_subscriberThread.get() != nullptr) {
m_subscriber->cancel();
m_subscriberThread->join();
}
if (m_remoteApplication.get() != nullptr) {
// Stop the remote application.
m_remoteApplication->stop();
......@@ -108,6 +116,41 @@ bool ReductionControllerTest::initApplication() {
return false;
}
// Create the requester
m_requester = application::Requester::create(*m_remoteApplication, RESPONDER);
if (m_requester.get() == 0) {
cout << "requester error" << endl;
return false;
}
// Create the subscriber.
m_subscriber = application::Subscriber::create(*m_remoteApplication, "publisher");
m_subscriberThread.reset(new std::thread([this] {
string result;
while (m_subscriber->receiveBinary(result)) {
// Parse the response.
madman::Result fileResult;
fileResult.ParseFromString(result);
// Write the file to disk.
writeFile(PROCESSED_FILE, fileResult.content());
// Read the file.
readProcessedFile();
// Remove the file.
remove(PROCESSED_FILE.c_str());
cout << "Result size " << fileResult.content().size() << endl;
}
}));
// Application initialized
initialized = true;
......@@ -152,7 +195,7 @@ void ReductionControllerTest::readProcessedFile() {
NXhandle fileId;
// Open the file.
int res = NXopen("processed.nxs", NXACC_RDWR, &fileId);
int res = NXopen(PROCESSED_FILE.c_str(), NXACC_RDWR, &fileId);
if (res != NX_OK) {
cerr << "Cannot open processed.nxs." << endl;
return;
......@@ -230,6 +273,8 @@ void ReductionControllerTest::readProcessedFile() {
NXclosegroup(fileId);
NXclose(&fileId);
cout << "Data size " << dataSize << endl;
// Check the data.
int index = 0;
while (index < dataSize) {
......@@ -241,15 +286,15 @@ void ReductionControllerTest::readProcessedFile() {
++index;
}
cout << "Forced sizes to " << index << endl;
cout << "Forced size to " << index << endl;
// x.setSize(index);
// data.setSize(index);
x.setSize(index);
data.setSize(index);
int fixedSize = 280;
/* int fixedSize = 280;
x.setSize(index < fixedSize ? index : fixedSize);
data.setSize(index < fixedSize ? index : fixedSize);
data.setSize(index < fixedSize ? index : fixedSize);*/
x.sendEvent();
data.sendEvent();
......@@ -269,37 +314,26 @@ void ReductionControllerTest::start() {
// Read the file.
readFile(fileDirectory() + "/" + filename(), fileContent);
// Create the requester
unique_ptr<application::Requester> requester = application::Requester::create(*m_remoteApplication, RESPONDER);
// Request type.
madman::Request type;
if (requester.get() == 0) {
cout << "requester error" << endl;
return;
}
// Set the type.
type.set_type(madman::Request::File);
// Start date.
Date begin;
// Create the file request.
madman::FileRequest request;
request.set_content(fileContent);
// Send the file content to the server.
requester->sendBinary(fileContent);
m_requester->sendTwoBinaryParts(type.SerializeAsString(), request.SerializeAsString());
// Wait for the response from the server.
string response;
requester->receiveBinary(response);
m_requester->receive(response);
// Print the duration.
Date end;
double ms = (end - begin).getMilliseconds();
cout << "File processed in " << ms << " ms" << endl;
// Write the file to disc.
writeFile("processed.nxs", response);
// Read the file.
readProcessedFile();
// Remove the file.
remove("processed.nxs");
}
void ReductionControllerTest::stop() {
......
......@@ -55,9 +55,14 @@ private:
static const std::string REMOTE_APPLICATION;
static const std::string RESPONDER;
static const std::string PUBLISHER;
static const std::string PROCESSED_FILE;
std::unique_ptr<cameo::Server> m_server;
std::unique_ptr<cameo::application::Instance> m_remoteApplication;
std::unique_ptr<cameo::application::Requester> m_requester;
std::unique_ptr<cameo::application::Subscriber> m_subscriber;
std::unique_ptr<std::thread> m_subscriberThread;
};
}
......
package madman;
option optimize_for = LITE_RUNTIME;
message Request {
enum Type {
Reset = 1;
File = 2;
}
required Type type = 1;
}
message FileRequest {
required bytes content = 1;
}
message EmptyRequest {
}
message Result {
required bytes content = 1;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment