Commit 9eb4d1c3 authored by Xavier Pacalet's avatar Xavier Pacalet

Adding a ZMQ+PortableByteArray system to get float64 array and int32array.

Now DataAccessor uses this system and not the corba provided functions.
parent 7b850e07
......@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import fr.ill.ics.nscclient.dataprovider.arraytransfer.ZMQClient;
import fr.ill.ics.nomadserver.core.DatabaseAccessor;
import fr.ill.ics.nomadserver.core.DatabaseAccessorHelper;
import fr.ill.ics.nomadserver.core.DatabaseAccessorPackage.BadPropertyTypeException;
......@@ -386,13 +388,25 @@ public class DataAccessor {
}
public int[] getInt32Array(int databaseID, int propertyID) {
Int32ArrayTransferServiceImpl arrayTransferServiceImpl = new Int32ArrayTransferServiceImpl();
//Date d = new Date();
int[] array;
array = ZMQClient.getInt32Array(propertyID);
/*
Date d2 = new Date();
System.out.println("transfer int32 in " + (d2.getTime() - d.getTime()) + "ms");*/
return array;
/*Int32ArrayTransferServiceImpl arrayTransferServiceImpl = new Int32ArrayTransferServiceImpl();
arrayTransferServiceImpl.activate();
databaseAccessor.getInt32Array(databaseID, propertyID, arrayTransferServiceImpl.getCorbaTransferService());
arrayTransferServiceImpl.deactivate();
arrayTransferServiceImpl.deactivate();*/
return arrayTransferServiceImpl.getData();
//return arrayTransferServiceImpl.getData();
}
public void setInt32Array(int databaseID, int propertyID, int[] data) {
......@@ -440,18 +454,21 @@ public class DataAccessor {
}
public double[] getFloat64Array(int databaseID, int propertyID) {
Date d = new Date();
//Date d = new Date();
double[] array;
/*
Float64ArrayTransferServiceImpl arrayTransferServiceImpl = new Float64ArrayTransferServiceImpl();
arrayTransferServiceImpl.activate();
databaseAccessor.getFloat64Array(databaseID, propertyID, arrayTransferServiceImpl.getCorbaTransferService());
arrayTransferServiceImpl.deactivate();
Date d2 = new Date();
System.out.println("transfer in " + (d2.getTime() - d.getTime()) + "ms");
*/
array = ZMQClient.getFloat64Array(propertyID);
return arrayTransferServiceImpl.getData();
/*Date d2 = new Date();
System.out.println("transfer float64 in " + (d2.getTime() - d.getTime()) + "ms");*/
//return arrayTransferServiceImpl.getData();
return array;
}
public void setFloat64Array(int databaseID, int propertyID, double[] data) {
......
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.nscclient.dataprovider.arraytransfer;
public class PortableByteArray {
private enum Endianess {
LITTLE,BIG;
}
private enum EncodedType {
DOUBLE, INT64, INT32, FLOAT;
}
// only used for unserialization
private static Endianess endianness;
private static int size;
private static byte[] encodedData;
private static int pos;
//only used for serialization
private static Endianess output_endian = Endianess.LITTLE;
// methods related to unserialization
private static short readShort() {
short result;
if (endianness == Endianess.BIG) {
result = (short) ((encodedData[pos+1] & 0xff) |
(encodedData[pos+0] & 0xff) << 8);
} else {
result = (short) ((encodedData[pos+0] & 0xff) |
(encodedData[pos+1] & 0xff) << 8);
}
pos = pos + 2;
return result;
}
private static int readInt() {
int result;
if (endianness == Endianess.BIG) {
result = (int) ((encodedData[pos+3] & 0xff) |
(encodedData[pos+2] & 0xff) << 8 |
(encodedData[pos+1] & 0xff) << 16 |
(encodedData[pos+0] & 0xff) << 24);
} else {
result = (int) ((encodedData[pos+0] & 0xff) |
(encodedData[pos+1] & 0xff) << 8 |
(encodedData[pos+2] & 0xff) << 16 |
(encodedData[pos+3] & 0xff) << 24);
}
pos = pos + 4;
return result;
}
private static long readLong() {
long l;
if (endianness == Endianess.BIG) {
l = (long) ((encodedData[pos+7] & 0xff) |
(encodedData[pos+6] & 0xff) << 8 |
(encodedData[pos+5] & 0xff) << 16 |
(encodedData[pos+4] & 0xff) << 24 |
(long)((encodedData[pos+3] & 0xff) << 0 |
(encodedData[pos+2] & 0xff) << 8 |
(encodedData[pos+1] & 0xff) << 16 |
(encodedData[pos+0] & 0xff) << 24) << 32);
} else {
l = (long) ((encodedData[pos] & 0xff) |
(encodedData[pos+1] & 0xff) << 8 |
(encodedData[pos+2] & 0xff) << 16 |
(encodedData[pos+3] & 0xff) << 24 |
(long)((encodedData[pos+4] & 0xff) << 0 |
(encodedData[pos+5] & 0xff) << 8 |
(encodedData[pos+6] & 0xff) << 16 |
(encodedData[pos+7] & 0xff) << 24) << 32);
}
pos = pos + 8;
return l;
}
private static void initUnserialize(EncodedType type) throws SizeException,
TypeException {
pos = 0;
endianness = Endianess.BIG;
int bytesLength;
if (type == EncodedType.DOUBLE || type == EncodedType.INT64) {
bytesLength = 8;
} else {
bytesLength = 4;
}
// Detect the endieness of the input
short endian = readShort();
if (endian == 1) {
endianness = Endianess.BIG;
} else {
endianness = Endianess.LITTLE;
}
short type_data = readShort();
if (type_data != type.ordinal()) {
throw new TypeException();
}
size = readInt();
if (size*bytesLength != encodedData.length-8) {
throw new SizeException();
}
}
public static double[] parseToDouble(byte[] inputArray) throws SizeException,
TypeException {
// Parse byte array to retrieve double array
double[] result = null;
encodedData = inputArray;
initUnserialize(EncodedType.DOUBLE);
result = new double[size];
// Parse the array
for (int i = 0; i < size; i++) {
result[i] = Double.longBitsToDouble(readLong());
}
return result;
}
public static int[] parseToInt(byte[] inputArray) throws SizeException,
TypeException {
// Parse byte array to retrieve int array
int[] result = null;
encodedData = inputArray;
initUnserialize(EncodedType.INT32);
result = new int[size];
// Parse the array
for (int i = 0; i < size; i++) {
result[i] = readInt();
}
return result;
}
public static float[] parseToFLoat(byte[] inputArray) throws SizeException,
TypeException {
// Parse byte array to retrieve float array
float[] result = null;
encodedData = inputArray;
initUnserialize(EncodedType.FLOAT);
result = new float[size];
// Parse the array
for (int i = 0; i < size; i++) {
result[i] = Float.intBitsToFloat(readInt());
}
return result;
}
public static long[] parseToLong(byte[] inputArray) throws SizeException,
TypeException {
// Parse byte array to retrieve long array
long[] result = null;
encodedData = inputArray;
initUnserialize(EncodedType.INT64);
result = new long[size];
// Parse the array
for (int i = 0; i < size; i++) {
result[i] = readLong();
}
return result;
}
// method related to serialization
private static void writeShort(short value) {
if (output_endian == Endianess.BIG) {
encodedData[pos] = (byte)((value >>> 8) & 0xFF);
encodedData[pos+1] = (byte)(value & 0xFF);
} else {
encodedData[pos+1] = (byte)((value >>> 8) & 0xFF);
encodedData[pos] = (byte)(value & 0xFF);
}
pos = pos + 2;
}
private static void writeInt(int value) {
if (output_endian == Endianess.BIG) {
encodedData[pos] = (byte)((value >>> 24) & 0xFF);
encodedData[pos+1] = (byte)((value >>> 16) & 0xFF);
encodedData[pos+2] = (byte)((value >>> 8) & 0xFF);
encodedData[pos+3] = (byte)(value & 0xFF);
} else {
encodedData[pos+3] = (byte)((value >>> 24) & 0xFF);
encodedData[pos+2] = (byte)((value >>> 16) & 0xFF);
encodedData[pos+1] = (byte)((value >>> 8) & 0xFF);
encodedData[pos] = (byte)(value & 0xFF);
}
pos = pos + 4;
}
private static void writeLong(long value) {
if (output_endian == Endianess.BIG) {
encodedData[pos] = (byte)((value >>> 56) & 0xFF);
encodedData[pos+1] = (byte)((value >>> 48) & 0xFF);
encodedData[pos+2] = (byte)((value >>> 40) & 0xFF);
encodedData[pos+3] = (byte)((value >>> 32) & 0xFF);
encodedData[pos+4] = (byte)((value >>> 24) & 0xFF);
encodedData[pos+5] = (byte)((value >>> 16) & 0xFF);
encodedData[pos+6] = (byte)((value >>> 8) & 0xFF);
encodedData[pos+7] = (byte) (value & 0xFF);
} else {
encodedData[pos+7] = (byte)((value >>> 56) & 0xFF);
encodedData[pos+6] = (byte)((value >>> 48) & 0xFF);
encodedData[pos+5] = (byte)((value >>> 40) & 0xFF);
encodedData[pos+4] = (byte)((value >>> 32) & 0xFF);
encodedData[pos+3] = (byte)((value >>> 24) & 0xFF);
encodedData[pos+2] = (byte)((value >>> 16) & 0xFF);
encodedData[pos+1] = (byte)((value >>> 8) & 0xFF);
encodedData[pos] = (byte) (value & 0xFF);
}
pos += 8;
}
private static void initSerialize(EncodedType type, int length) {
pos = 0;
encodedData = new byte[length*8+8];
writeShort((short)1);
writeShort((short)type.ordinal());
writeInt(length);
}
public static byte[] serializeFromDouble(double[] inputArray) {
initSerialize(EncodedType.DOUBLE,inputArray.length);
long d = 0;
for (int i = 0; i < inputArray.length; i++) {
d = Double.doubleToLongBits(inputArray[i]);
writeLong(d);
}
return encodedData;
}
public static byte[] serializeFromLong(long[] inputArray) {
initSerialize(EncodedType.INT64,inputArray.length);
for (int i = 0; i < inputArray.length; i++) {
writeLong(inputArray[i]);
}
return encodedData;
}
public static byte[] serializeFromInt(int[] inputArray) {
initSerialize(EncodedType.INT32,inputArray.length);
for (int i = 0; i < inputArray.length; i++) {
writeInt(inputArray[i]);
}
return encodedData;
}
public static byte[] serializeFromFloat(float[] inputArray) {
initSerialize(EncodedType.FLOAT,inputArray.length);
int d = 0;
for (int i = 0; i < inputArray.length; i++) {
d = Float.floatToIntBits(inputArray[i]);
writeInt(d);
}
return encodedData;
}
public static void setOutputEndianess(int number) {
if (number == 1) {
output_endian = Endianess.LITTLE;
} else {
output_endian = Endianess.BIG;
}
}
}
\ No newline at end of file
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.nscclient.dataprovider.arraytransfer;
public class SizeException extends Exception {
/**
*
*/
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.nscclient.dataprovider.arraytransfer;
public class TypeException extends Exception {
/**
*
*/
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.nscclient.dataprovider.arraytransfer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Properties;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import org.zeromq.ZMQ.Socket;
public class ZMQClient {
private static boolean init;
private static Socket client;
private static ZContext ctx;
private static String endpoint;
public static void loadConfiguration() {
Properties prop = new Properties();
InputStream input = null;
try {
// load file
File file = new File("./src/manager/config.properties");
input = new FileInputStream(file);
prop.load(input);
endpoint = prop.getProperty("ZMQManagerServerIP") +
":" + prop.getProperty("5555");
} catch (IOException io) {
endpoint = "127.0.0.1:5555";
} finally {
try {
input.close();
} catch (IOException e) {
} catch (NullPointerException e) {
}
}
}
private static byte[] getArray(int type, int IdProperty) {
if (!init) {
loadConfiguration();
init = true;
ctx = new ZContext();
client = ctx.createSocket(ZMQ.REQ);
client.connect("tcp://"+endpoint);
}
ZMsg msg = new ZMsg();
ByteBuffer b = ByteBuffer.allocate(64);
// the parameter should be in little endian, by convention
b.putInt(Integer.reverseBytes(type));
b.putInt(Integer.reverseBytes(IdProperty));
msg.add(b.array());
msg.send(client);
msg = ZMsg.recvMsg(client);
return msg.getFirst().getData();
}
public static double[] getFloat64Array(int IdProperty) {
double[] array = null;
byte[] data = getArray(1,IdProperty);
try {
array = PortableByteArray.parseToDouble(data);
} catch (Exception e) {
}
return array;
}
public static int[] getInt32Array(int IdProperty) {
int[] array = null;
byte[] data = getArray(0,IdProperty);
try {
array = PortableByteArray.parseToInt(data);
} catch (Exception e) {
}
return array;
}
}
\ No newline at end of file
ZMQManagerServerIP=127.0.0.1
ZMQManagerServerPort=5555
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment