Commit 2620ce2f authored by yannick legoc's avatar yannick legoc

Reimplemented Corba SurveyPublisher with Cameo.

parent 01941acf
......@@ -314,17 +314,6 @@
<arg line="${idlFlags} ${idlCommonDir}/DataChangeSubscriber.idl" />
</exec>
<echo message="Generating SurveySubscriber.java" />
<exec executable="idl">
<arg line="${idlFlags} ${idlCommonDir}/SurveySubscriber.idl" />
</exec>
<!-- data provider -->
<echo message="Generating SurveyPublisher.java" />
<exec executable="idl">
<arg line="${idlFlags} ${idlDataProviderDir}/SurveyPublisher.idl" />
</exec>
<!-- proto files -->
<echo message="Generating ServantConfiguration.java" />
<exec executable="protoc">
......
......@@ -170,7 +170,7 @@ public class LogSubscriber {
notifyLog(NotificationMessage.Log.parseFrom(new ByteArrayInputStream(data[1])));
}
else if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.LogXML) {
notifyLogXML(NotificationMessage.LogXML.parseFrom(new ByteArrayInputStream(data[1])));
notifyLogXML(NotificationMessage.MessageXML.parseFrom(new ByteArrayInputStream(data[1])));
}
else if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.ImageDataReady) {
notifyLogImageDataReady(NotificationMessage.ImageDataReady.parseFrom(new ByteArrayInputStream(data[1])));
......@@ -195,7 +195,7 @@ public class LogSubscriber {
public void unsubscribe() {
System.out.println("Unsubscribing...");
System.out.println("Unsubscribing log...");
// Stop the subscriber.
subscriber.cancel();
......@@ -315,7 +315,7 @@ public class LogSubscriber {
}
}
private void notifyLogXML(NotificationMessage.LogXML logMessage) {
private void notifyLogXML(NotificationMessage.MessageXML logMessage) {
synchronized (pendingXMLMessages) {
pendingXMLMessages.add(logMessage.getMessage());
......
......@@ -83,7 +83,6 @@ public class ServerSessionManager {
CommandLineAccessor.getInstance(serverId).init();
LogSubscriber.getInstance(serverId).init();
SurveySubcriberImpl.getInstance(serverId).init();
}
public static ServerSessionManager getInstance(String serverId) {
......
......@@ -18,6 +18,8 @@
package fr.ill.ics.nscclient.survey;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
......@@ -26,31 +28,20 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.omg.PortableServer.POAPackage.ServantNotActive;
import org.omg.PortableServer.POAPackage.WrongPolicy;
import fr.ill.ics.bridge.command.CommandZoneWrapper;
import fr.ill.ics.bridge.listeners.ServerSurveyXMLListener;
import fr.ill.ics.nomadserver.common.SurveySubscriberPOA;
import fr.ill.ics.nomadserver.dataprovider.SurveyPublisher;
import fr.ill.ics.nomadserver.dataprovider.SurveyPublisherHelper;
import fr.ill.ics.nscclient.corbabase.CorbaNamingService;
import fr.ill.ics.nscclient.corbabase.CorbaORB;
import fr.ill.ics.cameo.Application;
import fr.ill.ics.nomadserver.notification.NotificationMessage;
import fr.ill.ics.nscclient.serverconnection.ServerInstance;
/**
*
* @author ortizh
*
*/
public class SurveySubcriberImpl extends SurveySubscriberPOA {
public class SurveySubcriberImpl {
private Set<ServerSurveyXMLListener> surveyXMLListeners = new LinkedHashSet<ServerSurveyXMLListener>();
private List<String> pendingXMLMessages = new ArrayList<String>();
private SurveyPublisher surveyPublisher = null;
private int id = 0;
private Application.Subscriber subscriber;
private Thread subscriberThread;
private static Map<String, SurveySubcriberImpl> instances = new HashMap<String, SurveySubcriberImpl>();
......@@ -102,57 +93,88 @@ public class SurveySubcriberImpl extends SurveySubscriberPOA {
}
public void setID(int subscriberID) {
id = subscriberID;
}
public int getID() {
return id;
}
public void init() {
public void subscribe() {
try {
CorbaORB.getInstance().getPOA().servant_to_reference(this);
} catch (ServantNotActive e) {
System.err.println("Servant not active");
} catch (WrongPolicy e) {
System.err.println("Wrong policy");
// Connect nomad server.
Application.Instance nomad = ServerInstance.getInstance().getApplicationInstance(serverId);
if (nomad == null) {
System.err.println("Problem to connect to the nomad server " + serverId);
return;
}
// Define the log publisher.
String surveyPublisherName = "survey_publisher";
System.out.println("Trying to connect survey subscriber to " + surveyPublisherName);
// Create the subscriber.
subscriber = Application.Subscriber.create(nomad, surveyPublisherName);
System.out.println("Connected survey subscriber " + subscriber);
// Start the thread.
subscriberThread = new Thread(new Runnable() {
@Override
public void run() {
try {
String contextName = "NomadServer";
if (!serverId.equals(CommandZoneWrapper.SERVER_ID)) {
contextName += serverId;
}
org.omg.CORBA.Object corbaObj = CorbaNamingService.getInstance().resolveObject(contextName, "DataProvider", "surveyPublisher");
surveyPublisher = SurveyPublisherHelper.narrow(corbaObj);
} catch (CorbaNamingService.CORBAResolveFailureException rfe) {
System.err.println("Unable to obtain SurveyPublisher from Naming Service");
} catch (org.omg.CORBA.SystemException e) {
System.err.println("Unable to obtain SurveyPublisher from Naming Service");
}
}
public void subscribe() {
id = surveyPublisher.subscribe(_this());
// Receive data
while (true) {
// Blocking call.
byte[][] data = subscriber.receiveTwoParts();
if (data != null) {
try {
if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.SurveyXML) {
notifySurveyXML(NotificationMessage.MessageXML.parseFrom(new ByteArrayInputStream(data[1])));
}
else {
System.out.println("unable to process " + NotificationMessage.Message.parseFrom(data[0]).getType());
}
}
catch (IOException e) {
System.err.println("Cannot parse notification survey message");
}
}
else {
break;
}
}
}
});
subscriberThread.start();
}
public void unsubscribe() {
surveyPublisher.unsubscribe(id);
System.out.println("Unsubscribing survey...");
// Stop the subscriber.
subscriber.cancel();
// Join the thread.
try {
subscriberThread.join();
}
catch (InterruptedException e) {
e.printStackTrace();
}
// Terminate the subscriber.
subscriber.terminate();
System.out.println("Unsubscribed from the survey");
}
public void notifyXML(String message) {
public void notifySurveyXML(NotificationMessage.MessageXML surveyMessage) {
synchronized (pendingXMLMessages) {
pendingXMLMessages.add(message);
pendingXMLMessages.add(surveyMessage.getMessage());
}
}
public void readAndDispatch() {
// copy pending messages to unblock any server calls
ArrayList<String> messages;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment