This closes #718

This commit is contained in:
Clebert Suconic 2016-08-14 12:48:36 -04:00
commit 113c2a3477
6 changed files with 104 additions and 136 deletions

View File

@ -38,7 +38,7 @@ import org.jboss.logging.annotations.MessageLogger;
* so an INFO message would be 191000 to 191999 * so an INFO message would be 191000 to 191999
*/ */
@MessageLogger(projectCode = "AMQ") @MessageLogger(projectCode = "AMQ")
public interface ActiveMQVertxLogger extends BasicLogger { interface ActiveMQVertxLogger extends BasicLogger {
/** /**
* The vertx logger. * The vertx logger.

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.integration.vertx; package org.apache.activemq.artemis.integration.vertx;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -40,7 +39,7 @@ import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager; import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
public class IncomingVertxEventHandler implements ConnectorService { class IncomingVertxEventHandler implements ConnectorService {
private final String connectorName; private final String connectorName;
@ -68,11 +67,10 @@ public class IncomingVertxEventHandler implements ConnectorService {
private boolean isStarted = false; private boolean isStarted = false;
public IncomingVertxEventHandler(String connectorName, IncomingVertxEventHandler(String connectorName,
Map<String, Object> configuration, Map<String, Object> configuration,
StorageManager storageManager, StorageManager storageManager,
PostOffice postOffice, PostOffice postOffice) {
ScheduledExecutorService scheduledThreadPool) {
this.connectorName = connectorName; this.connectorName = connectorName;
this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration); this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration);
@ -271,7 +269,6 @@ public class IncomingVertxEventHandler implements ConnectorService {
else if (body instanceof ReplyException) { else if (body instanceof ReplyException) {
return VertxConstants.TYPE_REPLY_FAILURE; return VertxConstants.TYPE_REPLY_FAILURE;
} }
throw new IllegalArgumentException("Type not supported: " + message); throw new IllegalArgumentException("Type not supported: " + message);
} }

View File

@ -18,12 +18,10 @@ package org.apache.activemq.artemis.integration.vertx;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ConnectorService; import org.apache.activemq.artemis.core.server.ConnectorService;
@ -43,7 +41,7 @@ import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager; import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
public class OutgoingVertxEventHandler implements Consumer, ConnectorService { class OutgoingVertxEventHandler implements Consumer, ConnectorService {
private final String connectorName; private final String connectorName;
@ -73,11 +71,7 @@ public class OutgoingVertxEventHandler implements Consumer, ConnectorService {
private boolean isStarted = false; private boolean isStarted = false;
public OutgoingVertxEventHandler(String connectorName, OutgoingVertxEventHandler(String connectorName, Map<String, Object> configuration, PostOffice postOffice) {
Map<String, Object> configuration,
StorageManager storageManager,
PostOffice postOffice,
ScheduledExecutorService scheduledThreadPool) {
this.connectorName = connectorName; this.connectorName = connectorName;
this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration); this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration);
this.postOffice = postOffice; this.postOffice = postOffice;
@ -164,7 +158,7 @@ public class OutgoingVertxEventHandler implements Consumer, ConnectorService {
ServerMessage message = ref.getMessage(); ServerMessage message = ref.getMessage();
Object vertxMsgBody = null; Object vertxMsgBody;
// extract information from message // extract information from message
Integer type = message.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE); Integer type = message.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);

View File

@ -34,7 +34,7 @@ public class VertxIncomingConnectorServiceFactory implements ConnectorServiceFac
PostOffice postOffice, PostOffice postOffice,
ScheduledExecutorService scheduledThreadPool) { ScheduledExecutorService scheduledThreadPool) {
return new IncomingVertxEventHandler(connectorName, configuration, storageManager, postOffice, scheduledThreadPool); return new IncomingVertxEventHandler(connectorName, configuration, storageManager, postOffice);
} }

View File

@ -33,7 +33,7 @@ public class VertxOutgoingConnectorServiceFactory implements ConnectorServiceFac
StorageManager storageManager, StorageManager storageManager,
PostOffice postOffice, PostOffice postOffice,
ScheduledExecutorService scheduledThreadPool) { ScheduledExecutorService scheduledThreadPool) {
return new OutgoingVertxEventHandler(connectorName, configuration, storageManager, postOffice, scheduledThreadPool); return new OutgoingVertxEventHandler(connectorName, configuration, postOffice);
} }
@Override @Override

View File

@ -52,27 +52,27 @@ import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
*/ */
public class ActiveMQVertxUnitTest extends ActiveMQTestBase { public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
protected PlatformManager vertxManager; private PlatformManager vertxManager;
protected ActiveMQServer server; private ActiveMQServer server;
protected String host = "localhost"; private String host = "localhost";
protected String port = "0"; private String port = "0";
protected String incomingQueue1 = "vertxTestIncomingQueue1"; private String incomingQueue1 = "vertxTestIncomingQueue1";
protected String incomingVertxAddress1 = "org.apache.activemq.test.incoming1"; private String incomingVertxAddress1 = "org.apache.activemq.test.incoming1";
//outgoing using send //outgoing using send
protected String inOutQueue1 = "vertxTestInOutQueue1"; private String inOutQueue1 = "vertxTestInOutQueue1";
protected String incomingVertxAddress2 = "org.apache.activemq.test.incoming2"; private String incomingVertxAddress2 = "org.apache.activemq.test.incoming2";
protected String outgoingVertxAddress1 = "org.apache.activemq.test.outgoing1"; private String outgoingVertxAddress1 = "org.apache.activemq.test.outgoing1";
//outgoing using publish //outgoing using publish
protected String inOutQueue2 = "vertxTestInOutQueue2"; private String inOutQueue2 = "vertxTestInOutQueue2";
protected String incomingVertxAddress3 = "org.apache.activemq.test.incoming3"; private String incomingVertxAddress3 = "org.apache.activemq.test.incoming3";
protected String outgoingVertxAddress2 = "org.apache.activemq.test.outgoing2"; private String outgoingVertxAddress2 = "org.apache.activemq.test.outgoing2";
// Vertx is changing the classLoader to null.. this will preserve the original classloader // Vertx is changing the classLoader to null.. this will preserve the original classloader
ClassLoader contextClassLoader; private ClassLoader contextClassLoader;
//subclasses may override this method //subclasses may override this method
//in order to get a server with different connector services //in order to get a server with different connector services
@ -84,53 +84,28 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
super.setUp(); super.setUp();
//all queues //all queues
CoreQueueConfiguration qc1 = new CoreQueueConfiguration().setAddress(incomingQueue1).setName(incomingQueue1); CoreQueueConfiguration qc1 = createCoreQueueConfiguration(incomingQueue1);
CoreQueueConfiguration qc2 = new CoreQueueConfiguration().setAddress(inOutQueue1).setName(inOutQueue1); CoreQueueConfiguration qc2 = createCoreQueueConfiguration(inOutQueue1);
CoreQueueConfiguration qc3 = new CoreQueueConfiguration().setAddress(inOutQueue2).setName(inOutQueue2); CoreQueueConfiguration qc3 = createCoreQueueConfiguration(inOutQueue2);
//incoming //incoming
HashMap<String, Object> config1 = new HashMap<>(); HashMap<String, Object> config1 = createIncomingConnectionConfig(incomingVertxAddress1, incomingQueue1);
config1.put(VertxConstants.HOST, host); ConnectorServiceConfiguration inconf1 = createIncomingConnectorServiceConfiguration(config1, "test-vertx-incoming-connector1");
config1.put(VertxConstants.PORT, port);
config1.put(VertxConstants.QUEUE_NAME, incomingQueue1);
config1.put(VertxConstants.VERTX_ADDRESS, incomingVertxAddress1);
ConnectorServiceConfiguration inconf1 = new ConnectorServiceConfiguration().setFactoryClassName(VertxIncomingConnectorServiceFactory.class.getName()).setParams(config1).setName("test-vertx-incoming-connector1");
//outgoing send style //outgoing send style
HashMap<String, Object> config2 = new HashMap<>(); HashMap<String, Object> config2 = createOutgoingConnectionConfig(inOutQueue1, incomingVertxAddress2);
config2.put(VertxConstants.HOST, host); ConnectorServiceConfiguration inconf2 = createIncomingConnectorServiceConfiguration(config2, "test-vertx-incoming-connector2");
config2.put(VertxConstants.PORT, port);
config2.put(VertxConstants.QUEUE_NAME, inOutQueue1);
config2.put(VertxConstants.VERTX_ADDRESS, incomingVertxAddress2);
ConnectorServiceConfiguration inconf2 = new ConnectorServiceConfiguration().setFactoryClassName(VertxIncomingConnectorServiceFactory.class.getName()).setParams(config2).setName("test-vertx-incoming-connector2"); HashMap<String, Object> config3 = createOutgoingConnectionConfig(inOutQueue1, outgoingVertxAddress1);
ConnectorServiceConfiguration outconf1 = createOutgoingConnectorServiceConfiguration(config3, "test-vertx-outgoing-connector1");
HashMap<String, Object> config3 = new HashMap<>();
config3.put(VertxConstants.HOST, host);
config3.put(VertxConstants.PORT, port);
config3.put(VertxConstants.QUEUE_NAME, inOutQueue1);
config3.put(VertxConstants.VERTX_ADDRESS, outgoingVertxAddress1);
ConnectorServiceConfiguration outconf1 = new ConnectorServiceConfiguration().setFactoryClassName(VertxOutgoingConnectorServiceFactory.class.getName()).setParams(config3).setName("test-vertx-outgoing-connector1");
//outgoing publish style //outgoing publish style
HashMap<String, Object> config4 = new HashMap<>(); HashMap<String, Object> config4 = createOutgoingConnectionConfig(inOutQueue2, incomingVertxAddress3);
config4.put(VertxConstants.HOST, host); ConnectorServiceConfiguration inconf3 = createIncomingConnectorServiceConfiguration(config4, "test-vertx-incoming-connector3");
config4.put(VertxConstants.PORT, port);
config4.put(VertxConstants.QUEUE_NAME, inOutQueue2);
config4.put(VertxConstants.VERTX_ADDRESS, incomingVertxAddress3);
ConnectorServiceConfiguration inconf3 = new ConnectorServiceConfiguration().setFactoryClassName(VertxIncomingConnectorServiceFactory.class.getName()).setParams(config4).setName("test-vertx-incoming-connector3"); HashMap<String, Object> config5 = createOutgoingConnectionConfig(inOutQueue2, outgoingVertxAddress2);
HashMap<String, Object> config5 = new HashMap<>();
config5.put(VertxConstants.HOST, host);
config5.put(VertxConstants.PORT, port);
config5.put(VertxConstants.QUEUE_NAME, inOutQueue2);
config5.put(VertxConstants.VERTX_ADDRESS, outgoingVertxAddress2);
config5.put(VertxConstants.VERTX_PUBLISH, "true"); config5.put(VertxConstants.VERTX_PUBLISH, "true");
ConnectorServiceConfiguration outconf2 = createOutgoingConnectorServiceConfiguration(config5, "test-vertx-outgoing-connector2");
ConnectorServiceConfiguration outconf2 = new ConnectorServiceConfiguration().setFactoryClassName(VertxOutgoingConnectorServiceFactory.class.getName()).setParams(config5).setName("test-vertx-outgoing-connector2");
Configuration configuration = createDefaultInVMConfig().addQueueConfiguration(qc1).addQueueConfiguration(qc2).addQueueConfiguration(qc3).addConnectorServiceConfiguration(inconf1).addConnectorServiceConfiguration(inconf2).addConnectorServiceConfiguration(outconf1).addConnectorServiceConfiguration(inconf3).addConnectorServiceConfiguration(outconf2); Configuration configuration = createDefaultInVMConfig().addQueueConfiguration(qc1).addQueueConfiguration(qc2).addQueueConfiguration(qc3).addConnectorServiceConfiguration(inconf1).addConnectorServiceConfiguration(inconf2).addConnectorServiceConfiguration(outconf1).addConnectorServiceConfiguration(inconf3).addConnectorServiceConfiguration(outconf2);
@ -140,8 +115,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
/** /**
* (vertx events) ===> (incomingQueue1) ===> (activemq consumer) * (vertx events) ===> (incomingQueue1) ===> (activemq consumer)
*
* @throws Exception
*/ */
@Test @Test
public void testIncomingEvents() throws Exception { public void testIncomingEvents() throws Exception {
@ -211,7 +184,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
} }
//send a byte //send a byte
Byte aByte = new Byte((byte) 15); Byte aByte = (byte) 15;
vertx.eventBus().send(incomingVertxAddress1, aByte); vertx.eventBus().send(incomingVertxAddress1, aByte);
msg = receiveFromQueue(incomingQueue1); msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg); assertNotNull(msg);
@ -222,7 +195,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
assertEquals(aByte, recvByte); assertEquals(aByte, recvByte);
//send a Character //send a Character
Character aChar = new Character('a'); Character aChar = 'a';
vertx.eventBus().send(incomingVertxAddress1, aChar); vertx.eventBus().send(incomingVertxAddress1, aChar);
msg = receiveFromQueue(incomingQueue1); msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg); assertNotNull(msg);
@ -232,7 +205,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
assertEquals(aChar, recvChar); assertEquals(aChar, recvChar);
//send a Double //send a Double
Double aDouble = new Double(1234.56d); Double aDouble = 1234.56d;
vertx.eventBus().send(incomingVertxAddress1, aDouble); vertx.eventBus().send(incomingVertxAddress1, aDouble);
msg = receiveFromQueue(incomingQueue1); msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg); assertNotNull(msg);
@ -242,7 +215,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
assertEquals(aDouble, recvDouble); assertEquals(aDouble, recvDouble);
//send a Float //send a Float
Float aFloat = new Float(1234.56f); Float aFloat = 1234.56f;
vertx.eventBus().send(incomingVertxAddress1, aFloat); vertx.eventBus().send(incomingVertxAddress1, aFloat);
msg = receiveFromQueue(incomingQueue1); msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg); assertNotNull(msg);
@ -252,7 +225,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
assertEquals(aFloat, recvFloat); assertEquals(aFloat, recvFloat);
//send an Integer //send an Integer
Integer aInt = new Integer(1234); Integer aInt = 1234;
vertx.eventBus().send(incomingVertxAddress1, aInt); vertx.eventBus().send(incomingVertxAddress1, aInt);
msg = receiveFromQueue(incomingQueue1); msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg); assertNotNull(msg);
@ -262,7 +235,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
assertEquals(aInt, recvInt); assertEquals(aInt, recvInt);
//send a Long //send a Long
Long aLong = new Long(12345678L); Long aLong = 12345678L;
vertx.eventBus().send(incomingVertxAddress1, aLong); vertx.eventBus().send(incomingVertxAddress1, aLong);
msg = receiveFromQueue(incomingQueue1); msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg); assertNotNull(msg);
@ -272,7 +245,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
assertEquals(aLong, recvLong); assertEquals(aLong, recvLong);
//send a Short //send a Short
Short aShort = new Short((short) 321); Short aShort = (short) 321;
vertx.eventBus().send(incomingVertxAddress1, aShort); vertx.eventBus().send(incomingVertxAddress1, aShort);
msg = receiveFromQueue(incomingQueue1); msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg); assertNotNull(msg);
@ -339,25 +312,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
recvJsonString = msg.getBodyBuffer().readString(); recvJsonString = msg.getBodyBuffer().readString();
System.out.println("==== received json: " + recvJsonString); System.out.println("==== received json: " + recvJsonString);
assertEquals(aJsonArray, new JsonArray(recvJsonString)); assertEquals(aJsonArray, new JsonArray(recvJsonString));
//send a ReplyFailure
/*
ReplyFailure replyFailure = ReplyFailure.TIMEOUT;
int fakeFailureCode = 1234;
String failureMsg = "Test failure message";
ReplyException aReplyEx = new ReplyException(replyFailure, fakeFailureCode, failureMsg);
vertx.eventBus().send(incomingVertxAddress1, aReplyEx);
msg = receiveFromQueue(incomingQueue1);
assertNotNull(msg);
vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
assertEquals(VertxConstants.TYPE_REPLY_FAILURE, vertxType);
int recvType = msg.getBodyBuffer().readInt();
int recvCode = msg.getBodyBuffer().readInt();
String recvFailureMsg = msg.getBodyBuffer().readString();
assertEquals(replyFailure.toInt(), recvType);
assertEquals(fakeFailureCode, recvCode);
assertEquals(failureMsg, recvFailureMsg);
*/
} }
/** /**
@ -365,8 +319,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
* ===> (inOutQueue1) * ===> (inOutQueue1)
* ===> (outgoing handler) * ===> (outgoing handler)
* ===> send to vertx (outgoingVertxAddress1) * ===> send to vertx (outgoingVertxAddress1)
*
* @throws Exception
*/ */
@Test @Test
public void testOutgoingEvents() throws Exception { public void testOutgoingEvents() throws Exception {
@ -402,43 +354,43 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
handler.checkByteArrayMessageReceived(byteArray); handler.checkByteArrayMessageReceived(byteArray);
//send a byte //send a byte
Byte aByte = new Byte((byte) 15); Byte aByte = (byte) 15;
vertx.eventBus().send(incomingVertxAddress2, aByte); vertx.eventBus().send(incomingVertxAddress2, aByte);
handler.checkByteMessageReceived(aByte); handler.checkByteMessageReceived(aByte);
//send a Character //send a Character
Character aChar = new Character('a'); Character aChar = 'a';
vertx.eventBus().send(incomingVertxAddress2, aChar); vertx.eventBus().send(incomingVertxAddress2, aChar);
handler.checkCharacterMessageReceived(aChar); handler.checkCharacterMessageReceived(aChar);
//send a Double //send a Double
Double aDouble = new Double(1234.56d); Double aDouble = 1234.56d;
vertx.eventBus().send(incomingVertxAddress2, aDouble); vertx.eventBus().send(incomingVertxAddress2, aDouble);
handler.checkDoubleMessageReceived(aDouble); handler.checkDoubleMessageReceived(aDouble);
//send a Float //send a Float
Float aFloat = new Float(1234.56f); Float aFloat = 1234.56f;
vertx.eventBus().send(incomingVertxAddress2, aFloat); vertx.eventBus().send(incomingVertxAddress2, aFloat);
handler.checkFloatMessageReceived(aFloat); handler.checkFloatMessageReceived(aFloat);
//send an Integer //send an Integer
Integer aInt = new Integer(1234); Integer aInt = 1234;
vertx.eventBus().send(incomingVertxAddress2, aInt); vertx.eventBus().send(incomingVertxAddress2, aInt);
handler.checkIntegerMessageReceived(aInt); handler.checkIntegerMessageReceived(aInt);
//send a Long //send a Long
Long aLong = new Long(12345678L); Long aLong = 12345678L;
vertx.eventBus().send(incomingVertxAddress2, aLong); vertx.eventBus().send(incomingVertxAddress2, aLong);
handler.checkLongMessageReceived(aLong); handler.checkLongMessageReceived(aLong);
//send a Short //send a Short
Short aShort = new Short((short) 321); Short aShort = (short) 321;
vertx.eventBus().send(incomingVertxAddress2, aShort); vertx.eventBus().send(incomingVertxAddress2, aShort);
handler.checkShortMessageReceived(aShort); handler.checkShortMessageReceived(aShort);
@ -496,8 +448,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
* ===> (inOutQueue2) * ===> (inOutQueue2)
* ===> (outgoing handler) * ===> (outgoing handler)
* ===> public to vertx (outgoingVertxAddress2) * ===> public to vertx (outgoingVertxAddress2)
*
* @throws Exception
*/ */
@Test @Test
public void testOutgoingEvents2() throws Exception { public void testOutgoingEvents2() throws Exception {
@ -539,49 +489,49 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
handler2.checkByteArrayMessageReceived(byteArray); handler2.checkByteArrayMessageReceived(byteArray);
//send a byte //send a byte
Byte aByte = new Byte((byte) 15); Byte aByte = (byte) 15;
vertx.eventBus().send(incomingVertxAddress3, aByte); vertx.eventBus().send(incomingVertxAddress3, aByte);
handler1.checkByteMessageReceived(aByte); handler1.checkByteMessageReceived(aByte);
handler2.checkByteMessageReceived(aByte); handler2.checkByteMessageReceived(aByte);
//send a Character //send a Character
Character aChar = new Character('a'); Character aChar = 'a';
vertx.eventBus().send(incomingVertxAddress3, aChar); vertx.eventBus().send(incomingVertxAddress3, aChar);
handler1.checkCharacterMessageReceived(aChar); handler1.checkCharacterMessageReceived(aChar);
handler2.checkCharacterMessageReceived(aChar); handler2.checkCharacterMessageReceived(aChar);
//send a Double //send a Double
Double aDouble = new Double(1234.56d); Double aDouble = 1234.56d;
vertx.eventBus().send(incomingVertxAddress3, aDouble); vertx.eventBus().send(incomingVertxAddress3, aDouble);
handler1.checkDoubleMessageReceived(aDouble); handler1.checkDoubleMessageReceived(aDouble);
handler2.checkDoubleMessageReceived(aDouble); handler2.checkDoubleMessageReceived(aDouble);
//send a Float //send a Float
Float aFloat = new Float(1234.56f); Float aFloat = 1234.56f;
vertx.eventBus().send(incomingVertxAddress3, aFloat); vertx.eventBus().send(incomingVertxAddress3, aFloat);
handler1.checkFloatMessageReceived(aFloat); handler1.checkFloatMessageReceived(aFloat);
handler2.checkFloatMessageReceived(aFloat); handler2.checkFloatMessageReceived(aFloat);
//send an Integer //send an Integer
Integer aInt = new Integer(1234); Integer aInt = 1234;
vertx.eventBus().send(incomingVertxAddress3, aInt); vertx.eventBus().send(incomingVertxAddress3, aInt);
handler1.checkIntegerMessageReceived(aInt); handler1.checkIntegerMessageReceived(aInt);
handler2.checkIntegerMessageReceived(aInt); handler2.checkIntegerMessageReceived(aInt);
//send a Long //send a Long
Long aLong = new Long(12345678L); Long aLong = 12345678L;
vertx.eventBus().send(incomingVertxAddress3, aLong); vertx.eventBus().send(incomingVertxAddress3, aLong);
handler1.checkLongMessageReceived(aLong); handler1.checkLongMessageReceived(aLong);
handler2.checkLongMessageReceived(aLong); handler2.checkLongMessageReceived(aLong);
//send a Short //send a Short
Short aShort = new Short((short) 321); Short aShort = (short) 321;
vertx.eventBus().send(incomingVertxAddress3, aShort); vertx.eventBus().send(incomingVertxAddress3, aShort);
handler1.checkShortMessageReceived(aShort); handler1.checkShortMessageReceived(aShort);
@ -641,9 +591,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
private ClientMessage receiveFromQueue(String queueName) throws Exception { private ClientMessage receiveFromQueue(String queueName) throws Exception {
ClientMessage msg = null; ClientMessage msg = null;
try (ServerLocator locator = createInVMNonHALocator(); try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory sf = createSessionFactory(locator); ClientSession session = sf.createSession(false, true, true)) {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true)) {
ClientConsumer consumer = session.createConsumer(queueName); ClientConsumer consumer = session.createConsumer(queueName);
session.start(); session.start();
@ -657,9 +605,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
private void createVertxService() { private void createVertxService() {
System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName()); System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
vertxManager = PlatformLocator.factory.createPlatformManager(Integer.valueOf(port), host); vertxManager = PlatformLocator.factory.createPlatformManager(Integer.valueOf(port), host);
// vertxManager = PlatformLocator.factory.createPlatformManager(Integer.valueOf(port),
// host, quorumSize, haGroup + System.currentTimeMillis());
} }
private class VertxTestHandler implements Handler<BaseMessage<?>> { private class VertxTestHandler implements Handler<BaseMessage<?>> {
@ -675,61 +620,61 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
} }
} }
public void checkJsonArrayMessageReceived(JsonArray aJsonArray) { void checkJsonArrayMessageReceived(JsonArray aJsonArray) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
JsonArray body = (JsonArray) msg.body(); JsonArray body = (JsonArray) msg.body();
assertEquals(aJsonArray, body); assertEquals(aJsonArray, body);
} }
public void checkJsonObjectMessageReceived(final JsonObject aJsonObj) { void checkJsonObjectMessageReceived(final JsonObject aJsonObj) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
JsonObject body = (JsonObject) msg.body(); JsonObject body = (JsonObject) msg.body();
assertEquals(aJsonObj, body); assertEquals(aJsonObj, body);
} }
public void checkShortMessageReceived(final Short aShort) { void checkShortMessageReceived(final Short aShort) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
Short body = (Short) msg.body(); Short body = (Short) msg.body();
assertEquals(aShort, body); assertEquals(aShort, body);
} }
public void checkLongMessageReceived(final Long aLong) { void checkLongMessageReceived(final Long aLong) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
Long body = (Long) msg.body(); Long body = (Long) msg.body();
assertEquals(aLong, body); assertEquals(aLong, body);
} }
public void checkIntegerMessageReceived(final Integer aInt) { void checkIntegerMessageReceived(final Integer aInt) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
Integer body = (Integer) msg.body(); Integer body = (Integer) msg.body();
assertEquals(aInt, body); assertEquals(aInt, body);
} }
public void checkFloatMessageReceived(final Float aFloat) { void checkFloatMessageReceived(final Float aFloat) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
Float body = (Float) msg.body(); Float body = (Float) msg.body();
assertEquals(aFloat, body); assertEquals(aFloat, body);
} }
public void checkDoubleMessageReceived(final Double aDouble) { void checkDoubleMessageReceived(final Double aDouble) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
Double body = (Double) msg.body(); Double body = (Double) msg.body();
assertEquals(aDouble, body); assertEquals(aDouble, body);
} }
public void checkCharacterMessageReceived(final Character aChar) { void checkCharacterMessageReceived(final Character aChar) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
Character body = (Character) msg.body(); Character body = (Character) msg.body();
assertEquals(aChar, body); assertEquals(aChar, body);
} }
public void checkByteMessageReceived(final Byte aByte) { void checkByteMessageReceived(final Byte aByte) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
Byte body = (Byte) msg.body(); Byte body = (Byte) msg.body();
assertEquals(aByte, body); assertEquals(aByte, body);
} }
public void checkByteArrayMessageReceived(final byte[] byteArray) { void checkByteArrayMessageReceived(final byte[] byteArray) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
byte[] body = (byte[]) msg.body(); byte[] body = (byte[]) msg.body();
assertEquals(byteArray.length, body.length); assertEquals(byteArray.length, body.length);
@ -738,19 +683,19 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
} }
} }
public void checkBooleanMessageReceived(final Boolean boolValue) { void checkBooleanMessageReceived(final Boolean boolValue) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
Boolean body = (Boolean) msg.body(); Boolean body = (Boolean) msg.body();
assertEquals(boolValue, body); assertEquals(boolValue, body);
} }
public void checkStringMessageReceived(final String str) { void checkStringMessageReceived(final String str) {
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
String body = (String) msg.body(); String body = (String) msg.body();
assertEquals(str, body); assertEquals(str, body);
} }
public void checkBufferMessageReceived(final Buffer buffer) { void checkBufferMessageReceived(final Buffer buffer) {
byte[] source = buffer.getBytes(); byte[] source = buffer.getBytes();
BaseMessage<?> msg = waitMessage(); BaseMessage<?> msg = waitMessage();
Buffer body = (Buffer) msg.body(); Buffer body = (Buffer) msg.body();
@ -792,4 +737,36 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
Thread.currentThread().setContextClassLoader(contextClassLoader); Thread.currentThread().setContextClassLoader(contextClassLoader);
super.tearDown(); super.tearDown();
} }
private CoreQueueConfiguration createCoreQueueConfiguration(String queueName) {
return new CoreQueueConfiguration().setAddress(queueName).setName(queueName);
}
private ConnectorServiceConfiguration createOutgoingConnectorServiceConfiguration(HashMap<String, Object> config,
String name) {
return new ConnectorServiceConfiguration().setFactoryClassName(VertxOutgoingConnectorServiceFactory.class.getName()).setParams(config).setName(name);
}
private ConnectorServiceConfiguration createIncomingConnectorServiceConfiguration(HashMap<String, Object> config,
String name) {
return new ConnectorServiceConfiguration().setFactoryClassName(VertxIncomingConnectorServiceFactory.class.getName()).setParams(config).setName(name);
}
private HashMap<String, Object> createIncomingConnectionConfig(String vertxAddress, String incomingQueue) {
HashMap<String, Object> config1 = new HashMap<>();
config1.put(VertxConstants.HOST, host);
config1.put(VertxConstants.PORT, port);
config1.put(VertxConstants.VERTX_ADDRESS, vertxAddress);
config1.put(VertxConstants.QUEUE_NAME, incomingQueue);
return config1;
}
private HashMap<String, Object> createOutgoingConnectionConfig(String queueName, String vertxAddress) {
HashMap<String, Object> config1 = new HashMap<>();
config1.put(VertxConstants.HOST, host);
config1.put(VertxConstants.PORT, port);
config1.put(VertxConstants.QUEUE_NAME, queueName);
config1.put(VertxConstants.VERTX_ADDRESS, vertxAddress);
return config1;
}
} }