Commit 1b991538 authored by legoc's avatar legoc

Added tests for multiple parts messages of publisher/subscriber.

parent ec6502c3
<?xml version="1.0" encoding="UTF-8"?>
<config port="6000" max_applications="20" log_directory="logs" debug="OFF" polling_time="100">
<config port="10000" max_applications="20" log_directory="logs" debug="OFF" polling_time="100">
<applications>
......@@ -64,6 +64,14 @@
<start executable="java" args="-classpath target/cameo-tests-jeromq-0.0.1-full.jar fr.ill.ics.cameo.test.TestNonSyncSubscribersAndPublisherApplication"/>
</application>
<application name="pub2java" starting_time="inf" retries="0" stopping_time="1" stream="yes" log_directory="logs" multiple="no" restart="no">
<start executable="java" args="-classpath target/cameo-tests-jeromq-0.0.1-full.jar fr.ill.ics.cameo.test.TestPublisherTwoApplication"/>
</application>
<application name="subpub2java" starting_time="0" retries="0" stopping_time="1" stream="yes" multiple="no" restart="no">
<start executable="java" args="-classpath target/cameo-tests-jeromq-0.0.1-full.jar fr.ill.ics.cameo.test.TestSubscriberAndPublisherTwoApplication"/>
</application>
<application name="simplejava" starting_time="0" retries="0" stopping_time="1" stream="yes" log_directory="logs" multiple="no" restart="no">
<start executable="java" args="-classpath target/cameo-tests-jeromq-0.0.1-full.jar fr.ill.ics.cameo.test.TestSimpleApplication"/>
</application>
......@@ -141,6 +149,14 @@
<start executable="build/bin/testsubscriberandpublisher"/>
</application>
<application name="pub2cpp" starting_time="0" retries="0" stopping_time="1" stream="yes" log_directory="logs" multiple="no" restart="no">
<start executable="build/bin/testpublishertwo"/>
</application>
<application name="subpub2cpp" starting_time="0" retries="0" stopping_time="1" stream="yes" multiple="no" restart="no">
<start executable="build/bin/testsubscriberandpublishertwo"/>
</application>
<application name="nsubcpp" starting_time="0" retries="0" stopping_time="1" stream="no" log_directory="logs" multiple="yes" restart="no">
<start executable="build/bin/testnonsyncsubscriber"/>
</application>
......
<?xml version="1.0" encoding="UTF-8"?>
<config port="6000" max_applications="20" log_directory="logs" debug="OFF" polling_time="100">
<config port="10000" max_applications="20" log_directory="logs" debug="OFF" polling_time="100">
<applications>
......@@ -63,6 +63,14 @@
<application name="nsubpubjava" starting_time="0" retries="0" stopping_time="-1" stream="yes" multiple="no" restart="no">
<start executable="java" args="-classpath target/cameo-tests-jzmq-0.0.1-full.jar fr.ill.ics.cameo.test.TestNonSyncSubscribersAndPublisherApplication"/>
</application>
<application name="pub2java" starting_time="inf" retries="0" stopping_time="1" stream="yes" log_directory="logs" multiple="no" restart="no">
<start executable="java" args="-classpath target/cameo-tests-jzmq-0.0.1-full.jar fr.ill.ics.cameo.test.TestPublisherTwoApplication"/>
</application>
<application name="subpub2java" starting_time="0" retries="0" stopping_time="1" stream="yes" multiple="no" restart="no">
<start executable="java" args="-classpath target/cameo-tests-jzmq-0.0.1-full.jar fr.ill.ics.cameo.test.TestSubscriberAndPublisherTwoApplication"/>
</application>
<application name="simplejava" starting_time="0" retries="0" stopping_time="1" stream="yes" log_directory="logs" multiple="no" restart="no">
<start executable="java" args="-classpath target/cameo-tests-jzmq-0.0.1-full.jar fr.ill.ics.cameo.test.TestSimpleApplication"/>
......@@ -140,6 +148,14 @@
<application name="subpubcpp" starting_time="0" retries="0" stopping_time="1" stream="yes" multiple="no" restart="no">
<start executable="build/bin/testsubscriberandpublisher"/>
</application>
<application name="pub2cpp" starting_time="0" retries="0" stopping_time="1" stream="yes" log_directory="logs" multiple="no" restart="no">
<start executable="build/bin/testpublishertwo"/>
</application>
<application name="subpub2cpp" starting_time="0" retries="0" stopping_time="1" stream="yes" multiple="no" restart="no">
<start executable="build/bin/testsubscriberandpublishertwo"/>
</application>
<application name="nsubcpp" starting_time="0" retries="0" stopping_time="1" stream="no" log_directory="logs" multiple="yes" restart="no">
<start executable="build/bin/testnonsyncsubscriber"/>
......
......@@ -6,8 +6,10 @@ bin_PROGRAMS = \
testbadendpoint \
teststate \
testpublisher \
testpublishertwo \
testsubscriber \
testsubscriberandpublisher \
testsubscriberandpublishertwo \
testnonsyncsubscriber \
testnonsyncsubscribersandpublisher \
testresult \
......@@ -64,6 +66,14 @@ testpublisher_SOURCES = \
testpublisher_CPPFLAGS = $(CT_CXXFLAGS)
testpublisher_LDFLAGS = $(CT_LDFLAGS)
testpublisher_LDADD = $(CT_LIBS)
testpublishertwo_SOURCES = \
TestPublisherTwoApplication.cpp
testpublishertwo_CPPFLAGS = $(CT_CXXFLAGS)
testpublishertwo_LDFLAGS = $(CT_LDFLAGS)
testpublishertwo_LDADD = $(CT_LIBS)
testsubscriber_SOURCES = \
......@@ -81,6 +91,15 @@ testsubscriberandpublisher_CPPFLAGS = $(CT_CXXFLAGS)
testsubscriberandpublisher_LDFLAGS = $(CT_LDFLAGS)
testsubscriberandpublisher_LDADD = $(CT_LIBS)
testsubscriberandpublishertwo_SOURCES = \
TestSubscriberAndPublisherTwoApplication.cpp
testsubscriberandpublishertwo_CPPFLAGS = $(CT_CXXFLAGS)
testsubscriberandpublishertwo_LDFLAGS = $(CT_LDFLAGS)
testsubscriberandpublishertwo_LDADD = $(CT_LIBS)
testnonsyncsubscriber_SOURCES = \
TestNonSyncSubscriberApplication.cpp
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include <iostream>
#include <string>
#include <vector>
#include <sstream>
#include <cameo/cameo.h>
using namespace std;
using namespace cameo;
int main(int argc, char *argv[]) {
application::This::init(argc, argv);
// New block to ensure cameo objects are terminated before the application.
{
int numberOfSubscribers = 1;
if (argc > 2) {
istringstream is(argv[1]);
is >> numberOfSubscribers;
}
cout << "number of subscribers is " << numberOfSubscribers << endl;
if (application::This::isAvailable()) {
cout << "connected" << endl;
}
auto_ptr<application::Publisher> publisher;
try {
cout << "creating publisher and waiting for " << numberOfSubscribers << " subscriber(s)..." << endl;
publisher = application::Publisher::create("publisher", numberOfSubscribers);
publisher->waitForSubscribers();
} catch (const PublisherCreationException& e) {
cout << "publisher error" << endl;
return -1;
}
application::This::setRunning();
cout << "synchronized with " << numberOfSubscribers << " subscriber(s)" << endl;
// sending data
string buffer1, buffer2;
serialize("first", buffer1);
serialize("hello", buffer2);
publisher->sendTwoBinaryParts(buffer1, buffer2);
serialize("second", buffer1);
serialize("world", buffer2);
publisher->sendTwoBinaryParts(buffer1, buffer2);
serialize("third", buffer1);
serialize("!", buffer2);
publisher->sendTwoBinaryParts(buffer1, buffer2);
publisher->sendEnd();
cout << "finished the application" << endl;
}
return 0;
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include <iostream>
#include <string>
#include <vector>
#include <sstream>
#include <boost/lexical_cast.hpp>
#include <cameo/cameo.h>
using namespace std;
using namespace cameo;
int main(int argc, char *argv[]) {
application::This::init(argc, argv);
// New block to ensure cameo objects are terminated before the application.
{
string applicationName;
int numberOfTimes = 1;
if (argc > 2) {
applicationName = argv[1];
cout << "publisher application is " + applicationName << endl;
if (argc > 3) {
numberOfTimes = boost::lexical_cast<int>(argv[2]);
}
} else {
cerr << "arguments: [application name]" << endl;
return -1;
}
// get the client services
Server& server = application::This::getServer();
if (application::This::isAvailable() && server.isAvailable()) {
cout << "connected server " << server << endl;
}
// loop the number of times.
for (int i = 0; i < numberOfTimes; ++i) {
// start the application.
auto_ptr<application::Instance> publisherApplication = server.start(applicationName);
cout << "started application " << *publisherApplication << endl;
// create a subscriber to the application applicationName
auto_ptr<application::Subscriber> subscriber = application::Subscriber::create(*publisherApplication, "publisher");
cout << "created subscriber " << *subscriber << endl;
if (subscriber.get() == 0) {
cout << "subscriber error" << endl;
return -1;
}
application::This::setRunning();
// receiving data
string part1, part2;
string data1, data2;
while (subscriber->receiveTwoBinaryParts(part1, part2)) {
parse(part1, data1);
parse(part2, data2);
cout << "received " << data1 << " " << data2 << endl;
}
cout << "finished stream" << endl;
application::State state = publisherApplication->waitFor();
cout << "publisher application terminated with state " << application::toString(state) << endl;
}
}
return 0;
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.cameo.test;
import fr.ill.ics.cameo.Application;
import fr.ill.ics.cameo.RemoteException;
import fr.ill.ics.cameo.impl.Buffer;
public class TestPublisherTwoApplication {
public static void main(String[] args) {
Application.This.init(args);
int numberOfSubscribers = 1;
if (args.length > 1) {
numberOfSubscribers = Integer.parseInt(args[0]);
}
System.out.println("number of subscribers is " + numberOfSubscribers);
if (Application.This.isAvailable()) {
System.out.println("connected");
}
try {
System.out.println("creating publisher and waiting for " + numberOfSubscribers + " subscriber(s)...");
// create the publisher
Application.Publisher publisher = Application.Publisher.create("publisher", numberOfSubscribers);
// synchronize with subscribers
publisher.waitForSubscribers();
System.out.println("synchronized with the subscriber(s)");
Application.This.setRunning();
// sending data
publisher.sendTwoParts(Buffer.serialize("first"), Buffer.serialize("hello"));
publisher.sendTwoParts(Buffer.serialize("second"), Buffer.serialize("world"));
publisher.sendTwoParts(Buffer.serialize("third"), Buffer.serialize("!"));
publisher.terminate();
} catch (RemoteException e) {
System.out.println("publisher error");
} finally {
// Do not forget to terminate This.
Application.This.terminate();
}
System.out.println("finished the application");
}
}
\ No newline at end of file
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.cameo.test;
import fr.ill.ics.cameo.Application;
import fr.ill.ics.cameo.ConnectionTimeout;
import fr.ill.ics.cameo.Server;
import fr.ill.ics.cameo.impl.Buffer;
public class TestSubscriberAndPublisherTwoApplication {
public static void main(String[] args) {
Application.This.init(args);
String applicationName = null;
int numberOfTimes = 1;
if (args.length > 1) {
applicationName = args[0];
System.out.println("publisher application is " + applicationName);
if (args.length > 2) {
numberOfTimes = Integer.parseInt(args[1]);
}
} else {
System.err.println("arguments: [application name]");
System.exit(-1);
}
// get the client services
Server server = Application.This.getServer();
if (Application.This.isAvailable() && server.isAvailable()) {
System.out.println("connected application");
System.out.println("connected server " + server);
} else {
System.exit(-1);
}
try {
// loop the number of times.
for (int i = 0; i < numberOfTimes; ++i) {
// start the application.
Application.Instance publisherApplication = server.start(applicationName);
System.out.println("started application " + publisherApplication);
Application.Subscriber subscriber = Application.Subscriber.create(publisherApplication, "publisher");
System.out.println("created subscriber " + subscriber);
// receiving data
while (true) {
byte[][] data = subscriber.receiveTwoParts();
if (data != null) {
System.out.println("received " + Buffer.parseString(data[0]) + " " + Buffer.parseString(data[1]));
} else {
break;
}
}
System.out.println("finished stream, stream ? " + !subscriber.hasEnded());
// wait for the application.
int state = publisherApplication.waitFor();
System.out.println("publisher application terminated with state " + Application.State.toString(state));
// terminate the subscriber.
subscriber.terminate();
}
} catch (ConnectionTimeout e) {
System.out.println("connection timeout");
} finally {
// Do not forget to terminate This.
Application.This.terminate();
}
System.out.println("finished the application");
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment