Commit 96d8feb5 authored by Remi Perenon's avatar Remi Perenon

First ZMQ communication scheme

parent 270e340a
Pipeline #4494 failed with stages
in 62 minutes and 28 seconds
......@@ -33,12 +33,15 @@ Created on Mar 30, 2015
import abc
import glob
import os
import Queue as queue
import random
import stat
import string
import subprocess
import sys
import threading
import traceback
import zmq
from MDANSE import PLATFORM, REGISTRY
from MDANSE.Core.Error import Error
......@@ -46,6 +49,39 @@ from MDANSE.Framework.Configurable import Configurable
from MDANSE.Framework.Jobs.JobStatus import JobStatus
from MDANSE.Framework.OutputVariables.IOutputVariable import OutputData
class ZMQThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self, target=self.mainLoop)
self.stop_flag = False
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt(zmq.RCVTIMEO, 1000)
self.socket.setsockopt(zmq.SNDTIMEO, 1000)
self.socket.setsockopt(zmq.CONNECT_TIMEOUT, 1000)
self.messages = queue.Queue()
def mainLoop(self):
self.socket.connect("tcp://localhost:5556")
while not self.stop_flag:
while not self.messages.empty():
message = self.messages.get()
try:
self.socket.send(message)
# Get the reply.
self.socket.recv()
except:
pass
self.socket.close()
def stopThread(self):
self.stop_flag = True
def sendMessage(self, message):
self.messages.put(message)
class JobError(Error):
'''
This class handles any exception related to IJob-derived objects
......@@ -308,10 +344,12 @@ class IJob(Configurable):
"""
Run the job.
"""
self._name = None
try:
self._name = "%s_%s" % (self._type,IJob.define_unique_name())
self.thread = ZMQThread()
self.thread.start()
self.thread.sendMessage("Run %s" % self._name)
if status:
self._status = JobStatus(self)
......@@ -341,6 +379,11 @@ class IJob(Configurable):
except:
tb = traceback.format_exc()
raise JobError(self,tb)
finally:
if self._name:
self.thread.sendMessage("Stop %s" % self._name)
self.thread.stopThread()
@property
def info(self):
......
......@@ -84,16 +84,36 @@ def excepthook(error, message, tback):
LOGGER(trace,'error',['console','dialog'])
def start_zmq():
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://127.0.0.1:5556")
print "Server started"
while True:
# Wait for next request from client
message = socket.recv()
socket.send("OK for %s"%message)
class ZMQServer():
stop_flag = False
def __init__(self):
self.thread = threading.Thread(target=self.start_zmq)
self.thread.start()
def start_zmq(self):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.setsockopt(zmq.SNDTIMEO, 1000)
socket.setsockopt(zmq.CONNECT_TIMEOUT, 1000)
socket.bind("tcp://127.0.0.1:5556")
while not self.stop_flag:
# Wait for next request from client and reply
try:
message = socket.recv()
wx.CallAfter(self.post_print, message)
socket.send("OK for %s"%message)
except:pass#self.stop_flag = True
context.destroy()
def post_print(self, message):
print "==== Received from ZMQ ===="
print message
def stop(self):
self.stop_flag = True
class MainFrame(wx.Frame):
def __init__(self, parent, title="MDANSE: Molecular Dynamics Analysis for Neutron Scattering Experiments"):
......@@ -113,10 +133,7 @@ class MainFrame(wx.Frame):
sys.stderr = consoleHandler
sys.excepthook = excepthook
thread = threading.Thread(target=start_zmq)
thread.start()
#start_zmq()
self.zmq_server = ZMQServer()
def build_dialog(self):
......@@ -398,6 +415,7 @@ Authors:
d = wx.MessageDialog(None, 'Do you really want to quit ?', 'Question', wx.YES_NO|wx.YES_DEFAULT|wx.ICON_QUESTION)
if d.ShowModal() == wx.ID_YES:
self.Destroy()
self.zmq_server.stop()
def on_start_plotter(self, event = None):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment