Commit 4befa6d0 authored by yannick legoc's avatar yannick legoc
Browse files

Replaced Corba data changed publisher.

parent e8f8a3c1
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.nscclient.notification;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import fr.ill.ics.cameo.Application;
import fr.ill.ics.nomadserver.common.Common;
import fr.ill.ics.nomadserver.common.DataChangeSubscriberPackage.ConditionState;
import fr.ill.ics.nomadserver.notification.NotificationMessage;
import fr.ill.ics.nomadserver.notification.NotificationMessage.CommandProgressionChanged;
import fr.ill.ics.nomadserver.notification.NotificationMessage.CommandStateChanged;
import fr.ill.ics.nomadserver.notification.NotificationMessage.ConditionActionsStateChanged;
import fr.ill.ics.nomadserver.notification.NotificationMessage.ConditionStateChanged;
import fr.ill.ics.nomadserver.notification.NotificationMessage.ConfigurationChanged;
import fr.ill.ics.nomadserver.notification.NotificationMessage.PropertyChanged;
import fr.ill.ics.nscclient.dataprovider.DataAccessor.ClientCommandState;
import fr.ill.ics.nscclient.dataprovider.DataAccessor.ClientConditionState;
import fr.ill.ics.nscclient.dataprovider.DataAccessor.ClientEnableState;
import fr.ill.ics.nscclient.serverconnection.ServerInstance;
public class DataChangeSubscriber {
private Application.Subscriber subscriber;
private Thread subscriberThread;
private static Map<String, DataChangeSubscriber> instances = new HashMap<String, DataChangeSubscriber>();
private String serverId;
private DataChangeSubscriber(String serverId) {
this.serverId = serverId;
}
public static DataChangeSubscriber getInstance(String serverId) {
if (!instances.containsKey(serverId)) {
DataChangeSubscriber instance = new DataChangeSubscriber(serverId);
instances.put(serverId, instance);
}
return instances.get(serverId);
}
public static Map<String, DataChangeSubscriber> getInstances() {
return instances;
}
public void init() {
// Connect nomad server.
Application.Instance nomad = ServerInstance.getInstance().getApplicationInstance(serverId);
if (nomad == null) {
System.err.println("Problem to connect to the nomad server " + serverId);
return;
}
// Define the data change publisher.
String dataChangePublisherName = "data_change_publisher";
System.out.println("Trying to connect data change subscriber to " + dataChangePublisherName);
// Create the subscriber.
subscriber = Application.Subscriber.create(nomad, dataChangePublisherName);
System.out.println("Connected data change subscriber " + subscriber);
// Start the thread.
subscriberThread = new Thread(new Runnable() {
@Override
public void run() {
// Receive data
while (true) {
// Blocking call.
byte[][] data = subscriber.receiveTwoParts();
if (data != null) {
try {
if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.PropertyChanged) {
notifyPropertyChanged(NotificationMessage.PropertyChanged.parseFrom(new ByteArrayInputStream(data[1])));
}
else if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.CommandStateChanged) {
notifyCommandStateChanged(NotificationMessage.CommandStateChanged.parseFrom(new ByteArrayInputStream(data[1])));
}
else if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.CommandProgressionChanged) {
notifyCommandProgressionChanged(NotificationMessage.CommandProgressionChanged.parseFrom(new ByteArrayInputStream(data[1])));
}
else if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.ConfigurationChanged) {
notifyConfigurationChanged(NotificationMessage.ConfigurationChanged.parseFrom(new ByteArrayInputStream(data[1])));
}
else if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.ConditionStateChanged) {
notifyConditionStateChanged(NotificationMessage.ConditionStateChanged.parseFrom(new ByteArrayInputStream(data[1])));
}
else if (NotificationMessage.Message.parseFrom(data[0]).getType() == fr.ill.ics.nomadserver.notification.NotificationMessage.Message.Type.ConditionActionsStateChanged) {
notifyConditionActionsStateChanged(NotificationMessage.ConditionActionsStateChanged.parseFrom(new ByteArrayInputStream(data[1])));
}
else {
System.out.println("unable to process " + NotificationMessage.Message.parseFrom(data[0]).getType());
}
}
catch (IOException e) {
System.err.println("Cannot parse notification data change message");
}
}
else {
break;
}
}
}
});
subscriberThread.start();
}
public void unsubscribe() {
System.out.println("Unsubscribing data change...");
// Stop the subscriber.
subscriber.cancel();
// Join the thread.
try {
subscriberThread.join();
}
catch (InterruptedException e) {
e.printStackTrace();
}
// Terminate the subscriber.
subscriber.terminate();
System.out.println("Unsubscribed from the data change");
}
private void notifyPropertyChanged(PropertyChanged message) {
DataNotificationClient.getInstance().propertyChanged(message.getDatabaseID(), message.getPropertyID());
}
private void notifyCommandStateChanged(CommandStateChanged message) {
ClientCommandState clientState = ClientCommandState.INACTIVE;
Common.CommandStateType.Type state = message.getState();
if (state == Common.CommandStateType.Type.ACTIVE) {
clientState = ClientCommandState.ACTIVE;
} else if (state == Common.CommandStateType.Type.PAUSED) {
clientState = ClientCommandState.PAUSED;
} else if (state == Common.CommandStateType.Type.PAUSING) {
clientState = ClientCommandState.PAUSING;
} else if (state == Common.CommandStateType.Type.STOPPING) {
clientState = ClientCommandState.STOPPING;
} else if (state == Common.CommandStateType.Type.RESTARTING) {
clientState = ClientCommandState.RESTARTING;
} else if (state == Common.CommandStateType.Type.PENDING) {
clientState = ClientCommandState.PENDING;
} else if (state == Common.CommandStateType.Type.STARTING) {
clientState = ClientCommandState.STARTING;
}
DataNotificationClient.getInstance().commandStateChanged(message.getDatabaseID(), message.getCommandID(), clientState);
}
private void notifyCommandProgressionChanged(CommandProgressionChanged message) {
DataNotificationClient.getInstance().commandProgressionChanged(message.getDatabaseID(), message.getCommandID(), message.getProgression());
}
private void notifyConfigurationChanged(ConfigurationChanged message) {
ClientEnableState clientEnable = ClientEnableState.NO_CHANGE;
Common.EnableStateType.Type enable = message.getEnable();
if (enable == Common.EnableStateType.Type.ENABLED) {
clientEnable = ClientEnableState.ENABLED;
} else if (enable == Common.EnableStateType.Type.DISABLED) {
clientEnable = ClientEnableState.DISABLED;
} else if (enable == Common.EnableStateType.Type.NO_CHANGE) {
clientEnable = ClientEnableState.NO_CHANGE;
}
DataNotificationClient.getInstance().configurationChanged(message.getDatabaseID(), message.getServantID(), clientEnable);
}
private void notifyConditionStateChanged(ConditionStateChanged message) {
ClientConditionState clientState = ClientConditionState.INACTIVE;
Common.ConditionStateType.Type state = message.getState();
if (state == Common.ConditionStateType.Type.CONDITION_ACTIVE) {
clientState = ClientConditionState.ACTIVE;
} else if (state == Common.ConditionStateType.Type.CONDITION_INACTIVE) {
clientState = ClientConditionState.INACTIVE;
} else if (state == Common.ConditionStateType.Type.CONDITION_ON_ACTIVATION_DELAY) {
clientState = ClientConditionState.ON_ACTIVATION_DELAY;
}
DataNotificationClient.getInstance().conditionStateChanged(message.getConditionID(), message.getOn(), clientState);
}
private void notifyConditionActionsStateChanged(ConditionActionsStateChanged message) {
ClientConditionState clientState = ClientConditionState.INACTIVE;
Common.ConditionStateType.Type state = message.getState();
if (state == Common.ConditionStateType.Type.CONDITION_ACTIVE) {
clientState = ClientConditionState.ACTIVE;
} else if (state == Common.ConditionStateType.Type.CONDITION_INACTIVE) {
clientState = ClientConditionState.INACTIVE;
} else if (state == Common.ConditionStateType.Type.CONDITION_ON_ACTIVATION_DELAY) {
clientState = ClientConditionState.ON_ACTIVATION_DELAY;
}
DataNotificationClient.getInstance().conditionActionsStateChanged(message.getConditionID(), clientState);
}
}
......@@ -24,7 +24,6 @@ import org.omg.PortableServer.POAPackage.ServantNotActive;
import org.omg.PortableServer.POAPackage.WrongAdapter;
import org.omg.PortableServer.POAPackage.WrongPolicy;
import fr.ill.ics.cameo.Application;
import fr.ill.ics.nomadserver.common.DataChangeSubscriber;
import fr.ill.ics.nomadserver.common.DataChangeSubscriberHelper;
import fr.ill.ics.nomadserver.core.SessionGateway;
......@@ -33,19 +32,17 @@ import fr.ill.ics.nscclient.notification.DataChangeSubscriberImpl;
public class CorbaSessionGateway {
private Application.Requester servantManagerRequester;
private SessionGateway sessionGateway;
private DataChangeSubscriberImpl dataChangeSubscriberImpl;
public CorbaSessionGateway(SessionGateway sessionGateway) {
this.sessionGateway = sessionGateway;
subscribeToDataChangePublisher();
//subscribeToDataChangePublisher();
}
public void logout() {
unsubscriberFromPropertyChangePublisher();
//unsubscriberFromPropertyChangePublisher();
try {
sessionGateway.logout();
......
......@@ -37,6 +37,7 @@ import fr.ill.ics.nscclient.condition.ConditionManagerAccessor;
import fr.ill.ics.nscclient.dataprovider.DataAccessor;
import fr.ill.ics.nscclient.dataprovider.ServantManagerAccessor;
import fr.ill.ics.nscclient.log.LogSubscriber;
import fr.ill.ics.nscclient.notification.DataChangeSubscriber;
import fr.ill.ics.nscclient.notification.commandzone.ServerCommandZoneEventManager;
import fr.ill.ics.nscclient.servant.ConfigurationManager;
import fr.ill.ics.nscclient.servant.ConfigurationManager.LoadFailure;
......@@ -68,6 +69,7 @@ public class ServerSessionManager {
throw new ConnectionFailure();
}
DataChangeSubscriber.getInstance(serverId).init();
DataAccessor.getInstance(serverId).init();
ServantManagerAccessor.getInstance(serverId).init();
......@@ -82,7 +84,7 @@ public class ServerSessionManager {
ConditionManagerAccessor.getInstance(serverId).init();
CommandLineAccessor.getInstance(serverId).init();
LogSubscriber.getInstance(serverId).init();
LogSubscriber.getInstance(serverId).init();
}
public static ServerSessionManager getInstance(String serverId) {
......@@ -152,7 +154,8 @@ public class ServerSessionManager {
LogSubscriber.getInstance(serverId).unsubscribe();
SurveySubcriberImpl.getInstance(serverId).unsubscribe();
DataChangeSubscriber.getInstance(serverId).unsubscribe();
DataAccessor.getInstance(serverId).reset();
ServantManagerAccessor.getInstance(serverId).reset();
CommandLineAccessor.getInstance(serverId).reset();
......@@ -169,14 +172,20 @@ public class ServerSessionManager {
try {
LogSubscriber.getInstance(serverId).unsubscribe();
} catch (Exception e) {
System.out.println("Cannot unsubscribe Log");
System.out.println("Cannot unsubscribe log");
}
try {
SurveySubcriberImpl.getInstance(serverId).unsubscribe();
} catch (Exception e) {
System.out.println("Cannot unsubscribe Survey");
System.out.println("Cannot unsubscribe survey");
}
try {
DataChangeSubscriber.getInstance(serverId).unsubscribe();
} catch (Exception e) {
System.out.println("Cannot unsubscribe data change");
}
if (currentSession != null) {
currentSession.logout();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment