Commit 2c614eef authored by legoc's avatar legoc
Browse files

Simplified EventThread that only forwards events to listeners

parent 84cdc19b
......@@ -20,7 +20,6 @@ package fr.ill.ics.cameo;
import java.util.ArrayList;
import java.util.List;
import fr.ill.ics.cameo.Application.Instance;
import fr.ill.ics.cameo.impl.ApplicationImpl;
import fr.ill.ics.cameo.impl.InstanceImpl;
import fr.ill.ics.cameo.impl.PublisherImpl;
......
package fr.ill.ics.cameo;
/*
* Copyright 2015 Institut Laue-Langevin
*
......@@ -15,6 +14,7 @@ package fr.ill.ics.cameo;
* limitations under the Licence.
*/
package fr.ill.ics.cameo;
import java.util.concurrent.LinkedBlockingQueue;
......@@ -76,8 +76,4 @@ public class EventListener {
System.out.println("interrupted EventListener while putting");
}
}
public void notifyTerminalState(int applicationId) {
// do nothing here
}
}
\ No newline at end of file
......@@ -19,7 +19,6 @@ package fr.ill.ics.cameo;
import com.google.protobuf.InvalidProtocolBufferException;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.proto.Messages;
public class EventStreamSocket {
......
......@@ -18,15 +18,13 @@ package fr.ill.ics.cameo.impl;
import java.util.concurrent.ConcurrentLinkedDeque;
import fr.ill.ics.cameo.Application;
import fr.ill.ics.cameo.Event;
import fr.ill.ics.cameo.EventListener;
import fr.ill.ics.cameo.EventStreamSocket;
import fr.ill.ics.cameo.PortEvent;
import fr.ill.ics.cameo.PublisherEvent;
import fr.ill.ics.cameo.ResultEvent;
import fr.ill.ics.cameo.StatusEvent;
/**
* The EventThread class forwards the events from the EventStreamSocket socket to the registered listeners.
*/
class EventThread extends Thread {
private ServerImpl server;
......@@ -37,89 +35,6 @@ class EventThread extends Thread {
this.socket = socket;
}
private void processStatusEvent(StatusEvent status) {
// Test the terminal state
int state = status.getState();
boolean terminal = false;
if (state == Application.State.SUCCESS
|| state == Application.State.STOPPED
|| state == Application.State.KILLED
|| state == Application.State.ERROR) {
terminal = true;
}
// Send event to listeners.
// The EventListener contains the name attribute but not the application id because
// the id is not available at the registration.
ConcurrentLinkedDeque<EventListener> eventListeners = server.getEventListeners();
for (EventListener listener : eventListeners) {
// If the application name is null, all the status are pushed
// otherwise, filter on the name
if (listener.getName() == null
|| listener.getName().equals(status.getName())) {
listener.pushEvent(status);
// In case of terminal state, unregister the listener
// otherwise a memory leak occurs with Instance classes
if (terminal) {
listener.notifyTerminalState(status.getId());
}
}
}
}
private void processResultEvent(ResultEvent result) {
// Send event to listeners.
// The EventListener contains the name attribute but not the application id because
// the id is not available at the registration.
ConcurrentLinkedDeque<EventListener> eventListeners = server.getEventListeners();
for (EventListener listener : eventListeners) {
// Filter on the name
if (listener.getName() != null && listener.getName().equals(result.getName())) {
listener.pushEvent(result);
}
}
}
private void processPublisherEvent(PublisherEvent publisher) {
// Send event to listeners.
// The EventListener contains the name attribute but not the application id because
// the id is not available at the registration.
ConcurrentLinkedDeque<EventListener> eventListeners = server.getEventListeners();
for (EventListener listener : eventListeners) {
// If the application name is null, all the status are pushed
// otherwise, filter on the name
if (listener.getName() == null
|| listener.getName().equals(publisher.getName())) {
listener.pushEvent(publisher);
}
}
}
private void processPortEvent(PortEvent port) {
// Send event to listeners.
// The EventListener contains the name attribute but not the application id because
// the id is not available at the registration.
ConcurrentLinkedDeque<EventListener> eventListeners = server.getEventListeners();
for (EventListener listener : eventListeners) {
// If the application name is null, all the status are pushed
// otherwise, filter on the name
if (listener.getName() == null
|| listener.getName().equals(port.getName())) {
listener.pushEvent(port);
}
}
}
public void run() {
try {
......@@ -131,17 +46,15 @@ class EventThread extends Thread {
return;
}
if (event instanceof StatusEvent) {
processStatusEvent((StatusEvent)event);
} else if (event instanceof ResultEvent) {
processResultEvent((ResultEvent)event);
} else if (event instanceof PublisherEvent) {
processPublisherEvent((PublisherEvent)event);
// Forward the event to the listeners.
ConcurrentLinkedDeque<EventListener> eventListeners = server.getEventListeners();
for (EventListener listener : eventListeners) {
} else if (event instanceof PortEvent) {
processPortEvent((PortEvent)event);
// If the application name is null, all the status are pushed, otherwise, filter on the name.
if (listener.getName() == null
|| listener.getName().equals(event.getName())) {
listener.pushEvent(event);
}
}
}
......
......@@ -171,11 +171,26 @@ public class InstanceImpl extends EventListener {
}
@Override
public void notifyTerminalState(int applicationId) {
public void pushEvent(Event event) {
super.pushEvent(event);
// Unregister here.
if (applicationId == this.id) {
terminate();
// In case of status event, we need to terminate the instance to ensure to release the zeromq resources.
if (event instanceof StatusEvent) {
StatusEvent status = (StatusEvent)event;
int state = status.getState();
// Test if the state is terminal.
if (state == Application.State.SUCCESS
|| state == Application.State.STOPPED
|| state == Application.State.KILLED
|| state == Application.State.ERROR) {
// Unregister here.
if (status.getId() == this.id) {
terminate();
}
}
}
}
......
......@@ -19,7 +19,6 @@ package fr.ill.ics.cameo.impl;
import com.google.protobuf.InvalidProtocolBufferException;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.Zmq.Context;
import fr.ill.ics.cameo.proto.Messages.MessageType;
import fr.ill.ics.cameo.proto.Messages.MessageType.Type;
......
......@@ -22,7 +22,6 @@ import fr.ill.ics.cameo.Application;
import fr.ill.ics.cameo.ConnectionTimeout;
import fr.ill.ics.cameo.UnexpectedException;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.Zmq.Context;
import fr.ill.ics.cameo.proto.Messages;
import fr.ill.ics.cameo.proto.Messages.RequestResponse;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment