Commit 4300cea4 authored by yannick legoc's avatar yannick legoc

Reimplemented log subscriber with cameo and added profiles jeromq and

jzmq.
parent bc5e4867
......@@ -41,6 +41,35 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<profiles>
<profile>
<id>jzmq</id>
<properties>
<zmq.implementation>jzmq</zmq.implementation>
</properties>
<dependencies>
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jzmq</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>jeromq</id>
<properties>
<zmq.implementation>jeromq</zmq.implementation>
</properties>
<dependencies>
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.3.5</version>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>avalon-framework</groupId>
......@@ -65,7 +94,7 @@
<dependency>
<groupId>fr.ill.ics</groupId>
<artifactId>cameo-api-java</artifactId>
<version>0.0.1</version>
<version>0.1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
......@@ -295,22 +324,12 @@
<arg line="${idlFlags} ${idlCommonDir}/DataChangeSubscriber.idl" />
</exec>
<echo message="Generating LogSubscriber.java" />
<exec executable="idl">
<arg line="${idlFlags} ${idlCommonDir}/LogSubscriber.idl" />
</exec>
<echo message="Generating SurveySubscriber.java" />
<exec executable="idl">
<arg line="${idlFlags} ${idlCommonDir}/SurveySubscriber.idl" />
</exec>
<!-- data provider -->
<echo message="Generating LogPublisher.java" />
<exec executable="idl">
<arg line="${idlFlags} ${idlDataProviderDir}/LogPublisher.idl" />
</exec>
<echo message="Generating SurveyPublisher.java" />
<exec executable="idl">
<arg line="${idlFlags} ${idlDataProviderDir}/SurveyPublisher.idl" />
......@@ -327,6 +346,11 @@
<exec executable="protoc">
<arg line="${protoFlags} ${protoDir}/ServantConfiguration.proto" />
</exec>
<echo message="Generating NotificationMessages.java" />
<exec executable="protoc">
<arg line="${protoFlags} ${protoDir}/NotificationMessages.proto" />
</exec>
</target>
</configuration>
......
......@@ -18,6 +18,8 @@
package fr.ill.ics.nscclient.log;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
......@@ -26,24 +28,19 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.omg.PortableServer.POAPackage.ServantNotActive;
import org.omg.PortableServer.POAPackage.WrongPolicy;
import fr.ill.ics.bridge.command.CommandZoneWrapper;
import fr.ill.ics.bridge.events.ServerLogEvent;
import fr.ill.ics.bridge.events.ServerLogEvent.LogLevel;
import fr.ill.ics.bridge.listeners.ServerLogEventListener;
import fr.ill.ics.bridge.listeners.ServerLogImageDataReadyListener;
import fr.ill.ics.bridge.listeners.ServerLogXMLListener;
import fr.ill.ics.nomadserver.common.LogSubscriberPOA;
import fr.ill.ics.nomadserver.common.LogSubscriberPackage.Level;
import fr.ill.ics.nomadserver.common.LogSubscriberPackage.Type;
import fr.ill.ics.nomadserver.dataprovider.LogPublisher;
import fr.ill.ics.nomadserver.dataprovider.LogPublisherHelper;
import fr.ill.ics.nscclient.corbabase.CorbaNamingService;
import fr.ill.ics.nscclient.corbabase.CorbaORB;
import fr.ill.ics.cameo.Application;
import fr.ill.ics.cameo.ConnectionTimeout;
import fr.ill.ics.cameo.Server;
import fr.ill.ics.nomadserver.notification.NotificationMessage;
import fr.ill.ics.util.ConfigManager;
public class LogSubcriberImpl extends LogSubscriberPOA {
public class LogSubcriberImpl {
private Set<ServerLogEventListener> logEventListeners = new LinkedHashSet<ServerLogEventListener>();
private Set<ServerLogEvent> pendingChanges = new LinkedHashSet<ServerLogEvent>();
......@@ -54,8 +51,8 @@ public class LogSubcriberImpl extends LogSubscriberPOA {
private ServerLogImageDataReadyListener logImageDataReadyListener;
private List<String> pendingImageDataReadyNames = new ArrayList<String>();
private LogPublisher logpublisher = null;
private int id = 0;
private Application.Subscriber subscriber;
private Thread subscriberThread;
private static Map<String, LogSubcriberImpl> instances = new HashMap<String, LogSubcriberImpl>();
......@@ -138,48 +135,124 @@ public class LogSubcriberImpl extends LogSubscriberPOA {
public void setLogImageDataReadyListener(ServerLogImageDataReadyListener listener) {
logImageDataReadyListener = listener;
}
public void setID(int subscriberID) {
id = subscriberID;
}
public int getID() {
return id;
}
public void init() {
try {
CorbaORB.getInstance().getPOA().servant_to_reference(this);
} catch (ServantNotActive e) {
System.err.println("Servant not active");
} catch (WrongPolicy e) {
System.err.println("Wrong policy");
}
try {
String contextName = "NomadServer";
if (!serverId.equals(CommandZoneWrapper.SERVER_ID)) {
contextName += serverId;
}
org.omg.CORBA.Object corbaObj = CorbaNamingService.getInstance().resolveObject(contextName, "DataProvider", "logPublisher");
// Get the server endpoint.
String nomadServerEndpoint = ConfigManager.getInstance().getNomadServerEndpoint();
// Connect to the server.
Server server = new Server(nomadServerEndpoint);
logpublisher = LogPublisherHelper.narrow(corbaObj);
// Define the log publisher.
String logPublisherName = "log_publisher";
// Connect nomad server.
Application.Instance nomad = null;
// Do not connect to the same application if it is simulated.
if (!serverId.equals(CommandZoneWrapper.SERVER_ID)) {
} catch (CorbaNamingService.CORBAResolveFailureException rfe) {
System.err.println("Unable to obtain LogPublisher from Naming Service");
} catch (org.omg.CORBA.SystemException e) {
System.err.println("Unable to obtain LogPublisher from Naming Service");
// Add the server id to the publisher name.
logPublisherName += serverId;
try {
// Find the simulated instance.
List<Application.Instance> nomadApps = server.connectAll("nssim");
for (Application.Instance app : nomadApps) {
if (app.exists() && app.getId() == Integer.parseInt(serverId)) {
nomad = app;
break;
}
}
} catch (ConnectionTimeout e) {
System.err.println("Timeout while connecting nomad server");
}
}
}
public void subscribe() {
id = logpublisher.subscribe(_this());
else {
// Find the real instance.
String[] nomadAppNames = {"ns", "nsd", "nsv"};
try {
for (String appName : nomadAppNames) {
nomad = server.connect(appName);
if (nomad.exists()) {
break;
}
}
} catch (ConnectionTimeout e) {
System.err.println("Timeout while connecting nomad server");
}
}
if (nomad == null) {
System.err.println("Problem to connect to the nomad server " + serverId);
}
System.out.println("Trying to connect log subscriber to " + logPublisherName);
// Create the subscriber.
subscriber = Application.Subscriber.create(nomad, logPublisherName);
System.out.println("Connected log subscriber " + subscriber);
// Start the thread.
subscriberThread = new Thread(new Runnable() {
@Override
public void run() {
// Receive data
while (true) {
// Blocking call.
byte[][] data = subscriber.receiveTwoParts();
if (data != null) {
try {
if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.Log) {
notifyLog(NotificationMessage.Log.parseFrom(new ByteArrayInputStream(data[1])));
}
else if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.LogXML) {
notifyLogXML(NotificationMessage.LogXML.parseFrom(new ByteArrayInputStream(data[1])));
}
else if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.ImageDataReady) {
notifyLogImageDataReady(NotificationMessage.ImageDataReady.parseFrom(new ByteArrayInputStream(data[1])));
}
else {
System.out.println("unable to process " + NotificationMessage.Message.parseFrom(data[0]).getType());
}
}
catch (IOException e) {
System.err.println("Cannot parse notification log message");
}
}
else {
break;
}
}
}
});
subscriberThread.start();
}
public void unsubscribe() {
logpublisher.unsubscribe(id);
System.out.println("Unsubscribing...");
// Stop the subscriber.
subscriber.cancel();
// Join the thread.
try {
subscriberThread.join();
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Unsubscribed from the log");
}
public void notifyServerLogEventListeners(ServerLogEvent event) {
......@@ -256,39 +329,44 @@ public class LogSubcriberImpl extends LogSubscriberPOA {
}
}
private void notifyLog(NotificationMessage.Log logMessage) {
public void _notify(Type logType, Level logLevel, String message, int commandID, int servantID) {
LogLevel level = LogLevel.IGNORE;
if (logLevel == Level.IGNORE) {
level = LogLevel.IGNORE;
} else if (logLevel == Level.INFO) {
if (logMessage.getLevel() == fr.ill.ics.nomadserver.notification.NotificationMessage.Log.Level.INFO) {
level = LogLevel.INFO;
} else if (logLevel == Level.WARNING) {
}
else if (logMessage.getLevel() == fr.ill.ics.nomadserver.notification.NotificationMessage.Log.Level.WARNING) {
level = LogLevel.WARNING;
} else if (logLevel == Level.ERROR) {
}
else if (logMessage.getLevel() == fr.ill.ics.nomadserver.notification.NotificationMessage.Log.Level.ERROR) {
level = LogLevel.ERROR;
} else if (logLevel == Level.DEBUGGING) {
}
else if (logMessage.getLevel() == fr.ill.ics.nomadserver.notification.NotificationMessage.Log.Level.DEBUG) {
level = LogLevel.DEBUG;
} else if (logLevel == Level.SYNC) {
}
else if (logMessage.getLevel() == fr.ill.ics.nomadserver.notification.NotificationMessage.Log.Level.SYNC) {
level = LogLevel.SYNC;
}
ServerLogEvent event = new ServerLogEvent(level, message);
ServerLogEvent event = new ServerLogEvent(level, logMessage.getMessage());
synchronized (pendingChanges) {
pendingChanges.add(event);
}
}
private void notifyLogXML(NotificationMessage.LogXML logMessage) {
@Override
public void notifyXML(String message) {
synchronized (pendingXMLMessages) {
pendingXMLMessages.add(message);
pendingXMLMessages.add(logMessage.getMessage());
}
}
@Override
public void notifyImageDataReady(String imageName) {
private void notifyLogImageDataReady(NotificationMessage.ImageDataReady logMessage) {
synchronized (pendingImageDataReadyNames) {
pendingImageDataReadyNames.add(imageName);
}
pendingImageDataReadyNames.add(logMessage.getImageName());
}
}
}
\ No newline at end of file
}
......@@ -112,8 +112,6 @@ public class ServerSessionManager {
ControllerManager.initInstance(serverId);
DriverManager.initInstance(serverId);
}
LogSubcriberImpl.getInstance(serverId).subscribe();
}
public void logintab(boolean standAlone) throws CorbaSessionManager.ClientAlreadyLaunchedException, ConnectionFailure, LoadFailure {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment