Commit 29487ec0 authored by Remi Perenon's avatar Remi Perenon

First ZMQ communication between jobs and GUI trace

parent d0932372
Pipeline #4811 passed with stages
in 7 minutes and 41 seconds
......@@ -15,18 +15,59 @@
import abc
import glob
import os
import Queue as queue
import random
import stat
import string
import subprocess
import sys
import threading
import traceback
import zmq
from MDANSE import PLATFORM, REGISTRY
from MDANSE.Core.Error import Error
from MDANSE.Framework.Configurable import Configurable
from MDANSE.Framework.Jobs.JobStatus import JobStatus
from MDANSE.Framework.OutputVariables.IOutputVariable import OutputData
from MDANSE.Core.Singleton import Singleton
class ZMQThread(threading.Thread):
__metaclass__ = Singleton
def __init__(self):
threading.Thread.__init__(self, target=self.mainLoop)
self.stop_flag = False
self.messages = queue.Queue()
def __getstate__(self):
pass
def __setstate__(self):
pass
def mainLoop(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt(zmq.RCVTIMEO, 10)
self.socket.setsockopt(zmq.SNDTIMEO, 10)
self.socket.setsockopt(zmq.CONNECT_TIMEOUT, 10)
self.socket.connect("tcp://localhost:5556")
while not self.stop_flag:
while not self.messages.empty():
message = self.messages.get()
try:
self.socket.send(message)
# Get the reply.
self.socket.recv()
except:
pass
def stopThread(self):
self.stop_flag = True
def sendMessage(self, message):
self.messages.put(message)
class JobError(Error):
'''
......@@ -290,10 +331,14 @@ class IJob(Configurable):
"""
Run the job.
"""
self._name = None
try:
self._name = "%s_%s" % (self._type,IJob.define_unique_name())
self.thread = ZMQThread()
try:
self.thread.start()
except:pass
self.thread.sendMessage("Run %s" % self._name)
if status:
self._status = JobStatus(self)
......@@ -323,6 +368,11 @@ class IJob(Configurable):
except:
tb = traceback.format_exc()
raise JobError(self,tb)
finally:
if self._name:
self.thread.sendMessage("Stop %s" % self._name)
self.thread.stopThread()
@property
def info(self):
......
......@@ -16,7 +16,8 @@ import collections
import os
import sys
import webbrowser
import zmq
import threading
import wx
import wx.aui as aui
......@@ -64,7 +65,36 @@ def excepthook(error, message, tback):
trace = '\n'.join(trace)
LOGGER(trace,'error',['console','dialog'])
class ZMQServer():
stop_flag = False
def __init__(self):
self.thread = threading.Thread(target=self.start_zmq)
self.thread.start()
def start_zmq(self):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.setsockopt(zmq.SNDTIMEO, 1000)
socket.setsockopt(zmq.CONNECT_TIMEOUT, 1000)
socket.bind("tcp://127.0.0.1:5556")
while not self.stop_flag:
# Wait for next request from client and reply
try:
message = socket.recv()
wx.CallAfter(self.post_print, message)
socket.send("OK for %s"%message)
except:pass#self.stop_flag = True
def post_print(self, message):
print "==== Received from ZMQ ===="
print message
def stop(self):
self.stop_flag = True
class MainFrame(wx.Frame):
def __init__(self, parent, title="MDANSE: Molecular Dynamics Analysis for Neutron Scattering Experiments"):
......@@ -84,6 +114,7 @@ class MainFrame(wx.Frame):
sys.stderr = consoleHandler
sys.excepthook = excepthook
self.zmq_server = ZMQServer()
def build_dialog(self):
......@@ -365,6 +396,7 @@ Authors:
d = wx.MessageDialog(None, 'Do you really want to quit ?', 'Question', wx.YES_NO|wx.YES_DEFAULT|wx.ICON_QUESTION)
if d.ShowModal() == wx.ID_YES:
self.Destroy()
self.zmq_server.stop()
def on_start_plotter(self, event = None):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment