Commit 7f8c39d9 authored by legoc's avatar legoc
Browse files

InstanceImpl and ServerImpl removed

parent bf0d2a80
......@@ -19,13 +19,10 @@ package fr.ill.ics.cameo.base;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import fr.ill.ics.cameo.base.impl.InstanceImpl;
import fr.ill.ics.cameo.base.impl.ServerImpl;
import fr.ill.ics.cameo.base.impl.ThisImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
......@@ -45,10 +42,10 @@ public class Application {
public static class Com {
private ServerImpl server;
private Server server;
private int applicationId;
Com(ServerImpl server, int applicationId) {
Com(Server server, int applicationId) {
this.server = server;
this.applicationId = applicationId;
}
......@@ -151,7 +148,7 @@ public class Application {
private static Com com;
static private void initServer() {
server = new Server(impl.getServer());
server = impl.getServer();
server.registerEventListener(impl.getEventListener());
if (impl.getStarterEndpoint() != null) {
......@@ -284,7 +281,7 @@ public class Application {
}
// Iterate the instances to find the id
List<Instance> instances = starterServer.connectAll(impl.getStarterName());
List<Instance> instances = starterServer.connectAll(impl.getStarterName(), 0);
for (Instance i : instances) {
if (i.getId() == impl.getStarterId()) {
return i;
......@@ -406,212 +403,6 @@ public class Application {
}
}
/**
* Class that implements simple asynchronous programming model.
* There is no connection timeout as they are hidden as bad results.
* The class is not thread safe and should be used in a single thread.
* Question? stop/kill can be called concurrently
*
*/
public static class Instance {
private InstanceImpl impl;
/**
* Class defining the Communication Operations Manager (COM).
*/
public static class Com {
private ServerImpl server;
private int applicationId;
Com(ServerImpl server, int applicationId) {
this.server = server;
this.applicationId = applicationId;
}
public String getKeyValue(String key) throws UndefinedApplicationException, UndefinedKeyException {
return server.getKeyValue(applicationId, key);
}
public JSONObject requestJSON(JSONObject request) {
return server.requestJSON(request);
}
/**
* Method provided by convenience to simplify the parsing of JSON messages.
* @param message
* @return
*/
public JSONObject parse(byte[] message) {
try {
return server.parse(message);
}
catch (ParseException e) {
throw new UnexpectedException("Cannot parse message");
}
}
}
private Com com;
Instance(InstanceImpl impl) {
this.impl = impl;
this.com = new Com(impl.getServer(), impl.getId());
}
public String getName() {
return impl.getName();
}
public int getId() {
return impl.getId();
}
public Endpoint getEndpoint() {
return impl.getEndpoint();
}
public Endpoint getStatusEndpoint() {
return impl.getStatusEndpoint();
}
public String getNameId() {
return impl.getNameId();
}
public Com getCom() {
return com;
}
public boolean hasResult() {
return impl.hasResult();
}
/**
*
* @return true if the instance exists, i.e. the task is executed, otherwise false.
*/
public boolean exists() {
return impl.exists();
}
/**
* Returns the error message.
* @return
*/
public String getErrorMessage() {
return impl.getErrorMessage();
}
/**
* Requests the stop of the application.
* The stop is not blocking, so it must be followed by a call to waitFor to ensure the termination of the application.
* Or it can be called in parallel with waitFor.
* @return false if it does not succeed, the error message is then set.
*/
public boolean stop() {
return impl.stop();
}
/**
* Requests the kill of the application.
* The stop is not blocking, so it must be followed by a call to waitFor to ensure the termination of the application.
* Or it can be called in parallel with waitFor.
* @return false if it does not succeed, the error message is then set.
*/
public boolean kill() {
return impl.kill();
}
public int waitFor(int states) {
return impl.waitFor(states);
}
/**
* The call is blocking until a terminal state is received i.e. SUCCESS, STOPPED, KILLED, ERROR.
*/
public int waitFor() {
return impl.waitFor(0);
}
/**
* TODO Temporary access.
* @param eventName
* @return
*/
public int waitFor(String eventName) {
return impl.waitFor(eventName);
}
public int waitFor(KeyValue keyValue) {
return impl.waitFor(keyValue);
}
public void cancelWaitFor() {
impl.cancelWaitFor();
}
public int getLastState() {
// The call is not blocking but pops the entire content of the queue and returns the last received state, i.e. the current state.
return impl.waitFor(0, false);
}
public int getActualState() {
return impl.getActualState();
}
public Set<Integer> getPastStates() {
return impl.getPastStates();
}
/**
* Returns the exit code.
* @return null if is not assigned, the exit code otherwise
*/
public Integer getExitCode() {
return impl.getExitCode();
}
/**
* Terminates the local instances by removing the status listener.
* Does not kill nor stop the execution application instance.
* It terminates the local object.
*/
public void terminate() {
impl.terminate();
}
/**
* Returns the result of the Instance.
* @return
*/
public byte[] getBinaryResult() {
return impl.getResult();
}
/**
* Returns the result of the Instance. Provided by convenience for string results.
* Returns always null when the Instance was created by a connect call.
* @return
*/
public String getStringResult() {
return impl.getStringResult();
}
public OutputStreamSocket getOutputStreamSocket() {
return impl.getOutputStreamSocket();
}
@Override
public String toString() {
return impl.toString();
}
}
public static class Configuration {
private String name;
......
......@@ -26,12 +26,12 @@ public class ConnectionChecker {
void handle(boolean available);
}
private ServerImpl server;
private Server server;
private Handler handler;
private Thread thread = null;
private TimeCondition waitCondition = new TimeCondition();
public ConnectionChecker(ServerImpl server, Handler handler) {
public ConnectionChecker(Server server, Handler handler) {
this.server = server;
this.handler = handler;
}
......
......@@ -27,12 +27,12 @@ import fr.ill.ics.cameo.messages.Messages;
public class EventStreamSocket {
private ServerImpl server;
private Server server;
private Zmq.Socket socket;
private Zmq.Socket cancelSocket;
private boolean canceled = false;
public EventStreamSocket(ServerImpl server, Zmq.Socket subscriber, Zmq.Socket cancelPublisher) {
public EventStreamSocket(Server server, Zmq.Socket subscriber, Zmq.Socket cancelPublisher) {
super();
this.server = server;
this.socket = subscriber;
......
......@@ -14,23 +14,21 @@
* limitations under the Licence.
*/
package fr.ill.ics.cameo.base.impl;
package fr.ill.ics.cameo.base;
import java.util.concurrent.ConcurrentLinkedDeque;
import fr.ill.ics.cameo.base.Event;
import fr.ill.ics.cameo.base.EventListener;
import fr.ill.ics.cameo.base.EventStreamSocket;
import fr.ill.ics.cameo.base.impl.ServerImpl;
/**
* The EventThread class forwards the events from the EventStreamSocket socket to the registered listeners.
*/
class EventThread extends Thread {
private ServerImpl server;
private Server server;
private EventStreamSocket socket;
EventThread(ServerImpl server, EventStreamSocket socket) {
EventThread(Server server, EventStreamSocket socket) {
this.server = server;
this.socket = socket;
}
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.cameo.base.impl;
package fr.ill.ics.cameo.base;
import java.util.Set;
import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.CancelEvent;
import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.Event;
import fr.ill.ics.cameo.base.EventListener;
import fr.ill.ics.cameo.base.KeyEvent;
import fr.ill.ics.cameo.base.KeyValue;
import fr.ill.ics.cameo.base.OutputStreamSocket;
import fr.ill.ics.cameo.base.PortEvent;
import fr.ill.ics.cameo.base.PublisherEvent;
import fr.ill.ics.cameo.base.ResultEvent;
import fr.ill.ics.cameo.base.StatusEvent;
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import fr.ill.ics.cameo.base.impl.InstanceWaitingImpl;
import fr.ill.ics.cameo.base.impl.Response;
import fr.ill.ics.cameo.base.impl.ServerImpl;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
/**
* Class that implements simple asynchronous programming model.
* There is no connection timeout as they are hidden as bad results.
......@@ -42,12 +19,11 @@ import fr.ill.ics.cameo.strings.Endpoint;
* @author legoc
*
*/
public class InstanceImpl extends EventListener {
public class Instance extends EventListener {
private ServerImpl server;
private Server server;
private int id = -1;
private OutputStreamSocket outputSocket;
private String errorMessage;
private int pastStates = 0;
private int initialState = Application.State.UNKNOWN;
......@@ -56,7 +32,48 @@ public class InstanceImpl extends EventListener {
private InstanceWaitingImpl waiting = new InstanceWaitingImpl(this);
private Integer exitCode;
InstanceImpl(ServerImpl server) {
/**
* Class defining the Communication Operations Manager (COM).
*/
public static class Com {
private Server server;
private int applicationId;
Com(Server server, int applicationId) {
this.server = server;
this.applicationId = applicationId;
}
public String getKeyValue(String key) throws UndefinedApplicationException, UndefinedKeyException {
return server.getKeyValue(applicationId, key);
}
public JSONObject requestJSON(JSONObject request) {
return server.requestJSON(request);
}
/**
* Method provided by convenience to simplify the parsing of JSON messages.
* @param message
* @return
*/
public JSONObject parse(byte[] message) {
try {
return server.parse(message);
}
catch (ParseException e) {
throw new UnexpectedException("Cannot parse message");
}
}
}
private Com com;
Instance(Server server) {
this.server = server;
// Register the waiting
......@@ -67,10 +84,17 @@ public class InstanceImpl extends EventListener {
this.id = id;
}
public ServerImpl getServer() {
public Server getServer() {
return server;
}
public Com getCom() {
if (com == null) {
com = new Com(server, id);
}
return com;
}
void setOutputStreamSocket(OutputStreamSocket outputSocket) {
if (outputSocket != null) {
this.outputSocket = outputSocket;
......@@ -382,4 +406,4 @@ public class InstanceImpl extends EventListener {
return getName() + "." + id + "@" + server.getEndpoint();
}
}
\ No newline at end of file
}
......@@ -27,14 +27,14 @@ import fr.ill.ics.cameo.messages.Messages;
public class OutputStreamSocket {
private ServerImpl server;
private Server server;
private Zmq.Socket socket;
private Zmq.Socket cancelSocket;
private int applicationId = -1;
private boolean ended = false;
private boolean canceled = false;
public OutputStreamSocket(ServerImpl server, Zmq.Socket subscriber, Zmq.Socket cancelPublisher) {
public OutputStreamSocket(Server server, Zmq.Socket subscriber, Zmq.Socket cancelPublisher) {
super();
this.server = server;
this.socket = subscriber;
......
......@@ -16,13 +16,14 @@
package fr.ill.ics.cameo.base.impl;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.base.Waiting;
public class InstanceWaitingImpl extends Waiting {
private InstanceImpl instance;
private Instance instance;
public InstanceWaitingImpl(InstanceImpl instance) {
public InstanceWaitingImpl(Instance instance) {
this.instance = instance;
}
......
......@@ -29,6 +29,7 @@ import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.Event;
import fr.ill.ics.cameo.base.EventListener;
import fr.ill.ics.cameo.base.InvalidArgumentException;
import fr.ill.ics.cameo.base.Server;
import fr.ill.ics.cameo.base.StatusEvent;
import fr.ill.ics.cameo.base.UnexpectedException;
import fr.ill.ics.cameo.base.UnmanagedApplicationException;
......@@ -48,7 +49,7 @@ public class ThisImpl {
private String starterName;
private int starterId;
private ServerImpl server;
private Server server;
// Definition of a EventListener member.
private EventListener eventListener = new EventListener();
......@@ -128,7 +129,7 @@ public class ThisImpl {
private void initApplication() {
// Create the server.
server = new ServerImpl(serverEndpoint, 0);
server = new Server(serverEndpoint, 0);
// Init the unmanaged application.
if (!managed) {
......@@ -167,7 +168,7 @@ public class ThisImpl {
return starterId;
}
public ServerImpl getServer() {
public Server getServer() {
return server;
}
......
......@@ -2,7 +2,7 @@ package fr.ill.ics.cameo.coms;
import java.util.List;
import fr.ill.ics.cameo.base.Application.Instance;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.base.Server;
import fr.ill.ics.cameo.coms.impl.RequestImpl;
......
......@@ -2,8 +2,8 @@ package fr.ill.ics.cameo.coms;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.base.Application.Instance;
import fr.ill.ics.cameo.base.Application.This;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.coms.impl.RequesterImpl;
import fr.ill.ics.cameo.coms.impl.ResponderImpl;
import fr.ill.ics.cameo.messages.JSON;
......
......@@ -3,7 +3,7 @@ package fr.ill.ics.cameo.coms;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.Application.Instance;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.coms.impl.SubscriberImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
......
......@@ -20,9 +20,9 @@ import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.Application.Instance;
import fr.ill.ics.cameo.base.Application.This;
import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.impl.CancelIdGenerator;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
......
......@@ -27,6 +27,7 @@ import java.util.jar.Manifest;
import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.base.Option;
import fr.ill.ics.cameo.base.OutputPrintThread;
import fr.ill.ics.cameo.base.OutputStreamSocket;
......@@ -458,8 +459,8 @@ public class Console {
// Do nothing.
}
List<Application.Instance> applications = server.connectAll(applicationName);
for (Application.Instance application : applications) {
List<Instance> applications = server.connectAll(applicationName);
for (Instance application : applications) {
if ((applicationId == -1) || (applicationId == application.getId())) {
......@@ -493,8 +494,8 @@ public class Console {
// Do nothing.
}
List<Application.Instance> applications = server.connectAll(applicationName);
for (Application.Instance application : applications) {
List<Instance> applications = server.connectAll(applicationName);
for (Instance application : applications) {
if ((applicationId == -1) || (applicationId == application.getId())) {