Commit 6ea608e1 authored by legoc's avatar legoc

Adapted to new version of cameo-api-cpp 1.0.0, RemoteDPPCoincidence and

RemoteDPPHistogram do not work anymore
parent 2bbd47f8
<module name="remotefiletest">
<controller class="test::RemoteFilePub"/>
<controller class="test::RemoteFileRep"/>
</module>
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include <boost/lexical_cast.hpp>
#include <common/base/Date.h>
#include <fstream>
#include <iostream>
#include "RemoteFilePub.h"
namespace test {
using namespace std;
using namespace common;
using namespace boost;
using namespace cameo;
const std::string RemoteFilePub::TYPE = "remote_file_pub";
const std::string RemoteFilePub::REMOTE_APPLICATION = "nfilepub";
const std::string RemoteFilePub::FILE_PUBLISHER = "file_publisher";
const std::string RemoteFilePub::RESULT_PUBLISHER = "file_result";
RemoteFilePub::RemoteFilePub(const std::string& name) :
ExperimentController(name),
controller::Stoppable(this) {
serverEndpoint.init(this, SAVE, "cameo_server");
initialized.init(this, NOSAVE, "initialized");
filename.init(this, SAVE, "filename");
}
RemoteFilePub::RemoteFilePub(const RemoteFilePub& controller) :
ExperimentController(controller),
controller::Stoppable(this) {
}
RemoteFilePub::~RemoteFilePub() {
// Stop the subscriber
m_subscriber->cancel();
// Notify the application
m_publisher->sendEnd();
// Wait for the termination
application::State state = m_remoteApplication->waitFor();
cout << "Remote application " << m_remoteApplication->getName() << " terminated with state " << application::toString(state) << endl;
}
void RemoteFilePub::postConfiguration() {
if (serverEndpoint() == "") {
m_server.reset(new Server(application::This::getServer().getEndpoint()));
} else {
m_server.reset(new Server(serverEndpoint(), 1000));
}
initialized = initApplication();
}
bool RemoteFilePub::initApplication() {
// Do not initialize if it is already done
if (initialized()) {
return true;
}
// Start the remote server
if (!m_server->isAvailable(1000)) {
cout << "Remote server is not available" << endl;
return false;
}
cout << "Remote server is connected" << endl;
m_remoteApplication = m_server->connect(REMOTE_APPLICATION);
if (m_remoteApplication->exists()) {
// The application exists from a previous server session
m_remoteApplication->kill();
application::State state = m_remoteApplication->waitFor();
cout << "Terminated remote application with state " << application::toString(state) << endl;
}
m_remoteApplication = m_server->start(REMOTE_APPLICATION);
if (!m_remoteApplication->exists()) {
cout << "No remote application" << endl;
return false;
}
// Create the publisher after the start of the application, because it is a blocking call
m_publisher = application::Publisher::create(FILE_PUBLISHER, 1);
cout << "Publisher " << *m_publisher << endl;
// Wait for the subscriber
bool sync = m_publisher->waitForSubscribers();
// Create the subscriber after the publisher
m_subscriber = application::Subscriber::create(*m_remoteApplication, RESULT_PUBLISHER);
cout << "subscriber " << *m_subscriber << endl;
// Application initialized
initialized = true;
return true;
}
void RemoteFilePub::sendFile() {
if (!initApplication()) {
return;
}
ifstream numorFile;
numorFile.open(filename().c_str());
stringstream stream;
stream << numorFile.rdbuf();
Date begin;
string content = stream.str();
// Publish the message
m_publisher->sendBinary(content);
// Wait for result
vector<int32> result;
if (m_subscriber->receive(result)) {
Date end;
double ms = (end - begin).getMilliseconds();
cout << "Sent and received file in " << ms << " ms (" << (content.size() / ms) << " ko/s)" << endl;
if (content.size() != result[0]) {
cout << "\tProblem with size" << endl;
}
}
}
void RemoteFilePub::start() {
while (true) {
// Send the numor
sendFile();
if (isStopped()) {
break;
}
}
}
void RemoteFilePub::stop() {
}
}
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef TEST_REMOTEFILEPUB_H
#define TEST_REMOTEFILEPUB_H
#include <Controller.h>
#include <cameo/cameo.h>
namespace test {
class RemoteFilePub : public ExperimentController,
public controller::Stoppable {
public:
//! Type of controller
static const std::string TYPE;
RemoteFilePub(const std::string& name);
RemoteFilePub(const RemoteFilePub& controller);
virtual ~RemoteFilePub();
virtual void postConfiguration();
virtual void start();
virtual void stop();
Property<std::string> serverEndpoint;
Property<bool> initialized;
Property<std::string> filename;
private:
bool initApplication();
void sendFile();
static const std::string REMOTE_APPLICATION;
static const std::string FILE_PUBLISHER;
static const std::string RESULT_PUBLISHER;
std::unique_ptr<cameo::Server> m_server;
std::unique_ptr<cameo::application::Instance> m_remoteApplication;
std::unique_ptr<cameo::application::Publisher> m_publisher;
std::unique_ptr<cameo::application::Subscriber> m_subscriber;
};
}
#endif
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include <boost/lexical_cast.hpp>
#include <common/base/Date.h>
#include <fstream>
#include <iostream>
#include "RemoteFileRep.h"
#include <thread>
#include <memory>
std::thread t;
std::unique_ptr<int> u;
namespace test {
using namespace std;
using namespace common;
using namespace boost;
using namespace cameo;
const std::string RemoteFileRep::TYPE = "remote_file_rep";
const std::string RemoteFileRep::REMOTE_APPLICATION = "nfilerep";
RemoteFileRep::RemoteFileRep(const std::string& name) :
ExperimentController(name),
controller::Stoppable(this) {
serverEndpoint.init(this, SAVE, "cameo_server");
initialized.init(this, NOSAVE, "initialized");
filename.init(this, SAVE, "filename");
}
RemoteFileRep::RemoteFileRep(const RemoteFileRep& controller) :
ExperimentController(controller),
controller::Stoppable(this) {
}
RemoteFileRep::~RemoteFileRep() {
// Stop the remote application.
m_remoteApplication->stop();
// Wait for the termination
application::State state = m_remoteApplication->waitFor();
cout << "Remote application " << m_remoteApplication->getName() << " terminated with state " << application::toString(state) << endl;
}
void RemoteFileRep::postConfiguration() {
if (serverEndpoint() == "") {
m_server.reset(new Server(application::This::getServer().getEndpoint()));
} else {
m_server.reset(new Server(serverEndpoint(), 1000));
}
initialized = initApplication();
}
bool RemoteFileRep::initApplication() {
// Do not initialize if it is already done
if (initialized()) {
return true;
}
// Start the remote server
if (!m_server->isAvailable(1000)) {
cout << "Remote server is not available" << endl;
return false;
}
cout << "Remote server is connected" << endl;
m_remoteApplication = m_server->connect(REMOTE_APPLICATION);
if (m_remoteApplication->exists()) {
// The application exists from a previous server session
m_remoteApplication->kill();
application::State state = m_remoteApplication->waitFor();
cout << "Terminated remote application with state " << application::toString(state) << endl;
}
m_remoteApplication = m_server->start(REMOTE_APPLICATION);
if (!m_remoteApplication->exists()) {
cout << "No remote application" << endl;
return false;
}
// Application initialized
initialized = true;
return true;
}
void RemoteFileRep::sendFile() {
if (!initApplication()) {
return;
}
ifstream numorFile;
numorFile.open(filename().c_str());
stringstream stream;
stream << numorFile.rdbuf();
Date begin;
string content = stream.str();
// Create the requester
unique_ptr<application::Requester> requester = application::Requester::create(*m_remoteApplication, "file_responder");
if (requester.get() == 0) {
cout << "requester error" << endl;
return;
}
// Send the content in binary
requester->sendBinary(content);
string response;
requester->receiveBinary(response);
Date end;
double ms = (end - begin).getMilliseconds();
cout << "Sent and received file in " << ms << " ms (" << (content.size() / ms) << " ko/s)" << endl;
// if (content.size() != result[0]) {
// cout << "\tProblem with size" << endl;
// }
}
void RemoteFileRep::start() {
while (true) {
// Send the numor
sendFile();
if (isStopped()) {
break;
}
}
}
void RemoteFileRep::stop() {
}
}
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef TEST_REMOTEFILEREP_H
#define TEST_REMOTEFILEREP_H
#include <Controller.h>
#include <cameo/cameo.h>
namespace test {
class RemoteFileRep : public ExperimentController,
public controller::Stoppable {
public:
//! Type of controller
static const std::string TYPE;
RemoteFileRep(const std::string& name);
RemoteFileRep(const RemoteFileRep& controller);
virtual ~RemoteFileRep();
virtual void postConfiguration();
virtual void start();
virtual void stop();
Property<std::string> serverEndpoint;
Property<bool> initialized;
Property<std::string> filename;
private:
bool initApplication();
void sendFile();
static const std::string REMOTE_APPLICATION;
static const std::string FILE_RESPONDER;
std::unique_ptr<cameo::Server> m_server;
std::unique_ptr<cameo::application::Instance> m_remoteApplication;
};
}
#endif
......@@ -187,31 +187,31 @@ void RemoteDPPCoincidence::start() {
unique_ptr<boost::thread> thread = startFileReading();
// Listen to the result stream
vector<double> channelRates;
while (subscriber->receive(channelRates)) {
// copying the rates
for (int32 i = 0; i < totalNumberOfChannels(); i++) {
channelRate.set(i, channelRates[i]);
}
vector<double> coincidenceRates;
if (!subscriber->receive(coincidenceRates)) {
break;
}
if (coincidenceRates.size() <= 4 * 7) {
// Copying values
for (int32 i = 0; i < 7; ++i) {
globalRate.set(i, coincidenceRates[i]);
globalCleanRate.set(i, coincidenceRates[7 + i]);
detectorRate.set(i, coincidenceRates[7 * 2 + i]);
detectorCleanRate.set(i, coincidenceRates[7 * 3 + i]);
}
} else {
cerr << "problem with coincidence rates size" << endl;
}
}
// vector<double> channelRates;
// while (subscriber->receive(channelRates)) {
// // copying the rates
// for (int32 i = 0; i < totalNumberOfChannels(); i++) {
// channelRate.set(i, channelRates[i]);
// }
//
// vector<double> coincidenceRates;
// if (!subscriber->receive(coincidenceRates)) {
// break;
// }
//
// if (coincidenceRates.size() <= 4 * 7) {
// // Copying values
// for (int32 i = 0; i < 7; ++i) {
// globalRate.set(i, coincidenceRates[i]);
// globalCleanRate.set(i, coincidenceRates[7 + i]);
// detectorRate.set(i, coincidenceRates[7 * 2 + i]);
// detectorCleanRate.set(i, coincidenceRates[7 * 3 + i]);
// }
//
// } else {
// cerr << "problem with coincidence rates size" << endl;
// }
// }
// Wait for the termination of the application
application::State state = m_coincidenceApplication->waitFor();
......
......@@ -161,51 +161,51 @@ void RemoteDPPHistogram::start() {
unique_ptr<boost::thread> thread = startFileReading();
// Listen to the result stream
vector<double> channelRates;
while (subscriber->receive(channelRates)) {
//cout << "received rates " << channelRates.size() << endl;
// Copying the rates
for (int32 i = 0; i < totalNumberOfChannels(); i++) {
channelRate.set(i, channelRates[i]);
//cout << "rate " << i << " = " << channelRate(i) << endl;
}
vector<int32_t> histograms;
if (!subscriber->receive(histograms)) {
break;
}
// Copying the data
vector<int32_t>::const_iterator h = histograms.begin();
for (int32 i = 0; i < totalNumberOfChannels(); i++) {
if (h == histograms.end()) {
cerr << "problem with the histogram data" << endl;
break;
}
int32 size = histogram.getSize(i);
int32 * data = histogram(i);
for (int32 j = 0; j < size; ++j) {
if (h == histograms.end()) {
cerr << "problem with the histogram data" << endl;
break;
}
data[j] = *h;
++h;
}
if (h == histograms.end()) {
// ok
}
histogram.sendEvent(i);
}
}
// vector<double> channelRates;
// while (subscriber->receive(channelRates)) {
//
// //cout << "received rates " << channelRates.size() << endl;
//
// // Copying the rates
// for (int32 i = 0; i < totalNumberOfChannels(); i++) {
// channelRate.set(i, channelRates[i]);
// //cout << "rate " << i << " = " << channelRate(i) << endl;
// }
//
// vector<int32_t> histograms;
// if (!subscriber->receive(histograms)) {
// break;
// }
//
// // Copying the data
// vector<int32_t>::const_iterator h = histograms.begin();
// for (int32 i = 0; i < totalNumberOfChannels(); i++) {
//
// if (h == histograms.end()) {
// cerr << "problem with the histogram data" << endl;
// break;
// }
//
// int32 size = histogram.getSize(i);
// int32 * data = histogram(i);
//
// for (int32 j = 0; j < size; ++j) {
// if (h == histograms.end()) {
// cerr << "problem with the histogram data" << endl;
// break;
// }
//
// data[j] = *h;
// ++h;
// }
//
// if (h == histograms.end()) {
// // ok
// }
//
// histogram.sendEvent(i);
// }
// }
// Wait for the termination of the application
application::State state = m_histoApplication->waitFor();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment