From 4ae18782da9bcd1277dda29e07aae5001fe43f03 Mon Sep 17 00:00:00 2001 From: Bennet Schulz Date: Thu, 11 Aug 2016 20:45:46 +0200 Subject: [PATCH] ARTEMIS-680 some refactorings within Vert.X integration and corresponding tests --- .../vertx/ActiveMQVertxLogger.java | 2 +- .../vertx/IncomingVertxEventHandler.java | 13 +- .../vertx/OutgoingVertxEventHandler.java | 12 +- .../VertxIncomingConnectorServiceFactory.java | 2 +- .../VertxOutgoingConnectorServiceFactory.java | 2 +- .../vertx/ActiveMQVertxUnitTest.java | 209 ++++++++---------- 6 files changed, 104 insertions(+), 136 deletions(-) diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java index e4a0382904..1f30c4c5fa 100644 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java +++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java @@ -38,7 +38,7 @@ import org.jboss.logging.annotations.MessageLogger; * so an INFO message would be 191000 to 191999 */ @MessageLogger(projectCode = "AMQ") -public interface ActiveMQVertxLogger extends BasicLogger { +interface ActiveMQVertxLogger extends BasicLogger { /** * The vertx logger. diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java index a321820de2..49be35c129 100644 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java +++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.integration.vertx; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; @@ -40,7 +39,7 @@ import org.vertx.java.platform.PlatformLocator; import org.vertx.java.platform.PlatformManager; import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; -public class IncomingVertxEventHandler implements ConnectorService { +class IncomingVertxEventHandler implements ConnectorService { private final String connectorName; @@ -68,11 +67,10 @@ public class IncomingVertxEventHandler implements ConnectorService { private boolean isStarted = false; - public IncomingVertxEventHandler(String connectorName, - Map configuration, - StorageManager storageManager, - PostOffice postOffice, - ScheduledExecutorService scheduledThreadPool) { + IncomingVertxEventHandler(String connectorName, + Map configuration, + StorageManager storageManager, + PostOffice postOffice) { this.connectorName = connectorName; this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration); @@ -271,7 +269,6 @@ public class IncomingVertxEventHandler implements ConnectorService { else if (body instanceof ReplyException) { return VertxConstants.TYPE_REPLY_FAILURE; } - throw new IllegalArgumentException("Type not supported: " + message); } diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java index 2651adf2a8..6c50b260ef 100644 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java +++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java @@ -18,12 +18,10 @@ package org.apache.activemq.artemis.integration.vertx; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.ConnectorService; @@ -43,7 +41,7 @@ import org.vertx.java.platform.PlatformLocator; import org.vertx.java.platform.PlatformManager; import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; -public class OutgoingVertxEventHandler implements Consumer, ConnectorService { +class OutgoingVertxEventHandler implements Consumer, ConnectorService { private final String connectorName; @@ -73,11 +71,7 @@ public class OutgoingVertxEventHandler implements Consumer, ConnectorService { private boolean isStarted = false; - public OutgoingVertxEventHandler(String connectorName, - Map configuration, - StorageManager storageManager, - PostOffice postOffice, - ScheduledExecutorService scheduledThreadPool) { + OutgoingVertxEventHandler(String connectorName, Map configuration, PostOffice postOffice) { this.connectorName = connectorName; this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration); this.postOffice = postOffice; @@ -164,7 +158,7 @@ public class OutgoingVertxEventHandler implements Consumer, ConnectorService { ServerMessage message = ref.getMessage(); - Object vertxMsgBody = null; + Object vertxMsgBody; // extract information from message Integer type = message.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE); diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java index 58d20e800f..03afe20ced 100644 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java +++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java @@ -34,7 +34,7 @@ public class VertxIncomingConnectorServiceFactory implements ConnectorServiceFac PostOffice postOffice, ScheduledExecutorService scheduledThreadPool) { - return new IncomingVertxEventHandler(connectorName, configuration, storageManager, postOffice, scheduledThreadPool); + return new IncomingVertxEventHandler(connectorName, configuration, storageManager, postOffice); } diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java index 7c92bc7f0b..2ae08488e6 100644 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java +++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java @@ -33,7 +33,7 @@ public class VertxOutgoingConnectorServiceFactory implements ConnectorServiceFac StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledThreadPool) { - return new OutgoingVertxEventHandler(connectorName, configuration, storageManager, postOffice, scheduledThreadPool); + return new OutgoingVertxEventHandler(connectorName, configuration, postOffice); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java index f547316bae..5c667edb1f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java @@ -52,27 +52,27 @@ import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; */ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { - protected PlatformManager vertxManager; - protected ActiveMQServer server; + private PlatformManager vertxManager; + private ActiveMQServer server; - protected String host = "localhost"; - protected String port = "0"; + private String host = "localhost"; + private String port = "0"; - protected String incomingQueue1 = "vertxTestIncomingQueue1"; - protected String incomingVertxAddress1 = "org.apache.activemq.test.incoming1"; + private String incomingQueue1 = "vertxTestIncomingQueue1"; + private String incomingVertxAddress1 = "org.apache.activemq.test.incoming1"; //outgoing using send - protected String inOutQueue1 = "vertxTestInOutQueue1"; - protected String incomingVertxAddress2 = "org.apache.activemq.test.incoming2"; - protected String outgoingVertxAddress1 = "org.apache.activemq.test.outgoing1"; + private String inOutQueue1 = "vertxTestInOutQueue1"; + private String incomingVertxAddress2 = "org.apache.activemq.test.incoming2"; + private String outgoingVertxAddress1 = "org.apache.activemq.test.outgoing1"; //outgoing using publish - protected String inOutQueue2 = "vertxTestInOutQueue2"; - protected String incomingVertxAddress3 = "org.apache.activemq.test.incoming3"; - protected String outgoingVertxAddress2 = "org.apache.activemq.test.outgoing2"; + private String inOutQueue2 = "vertxTestInOutQueue2"; + private String incomingVertxAddress3 = "org.apache.activemq.test.incoming3"; + private String outgoingVertxAddress2 = "org.apache.activemq.test.outgoing2"; // Vertx is changing the classLoader to null.. this will preserve the original classloader - ClassLoader contextClassLoader; + private ClassLoader contextClassLoader; //subclasses may override this method //in order to get a server with different connector services @@ -84,53 +84,28 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { super.setUp(); //all queues - CoreQueueConfiguration qc1 = new CoreQueueConfiguration().setAddress(incomingQueue1).setName(incomingQueue1); - CoreQueueConfiguration qc2 = new CoreQueueConfiguration().setAddress(inOutQueue1).setName(inOutQueue1); - CoreQueueConfiguration qc3 = new CoreQueueConfiguration().setAddress(inOutQueue2).setName(inOutQueue2); + CoreQueueConfiguration qc1 = createCoreQueueConfiguration(incomingQueue1); + CoreQueueConfiguration qc2 = createCoreQueueConfiguration(inOutQueue1); + CoreQueueConfiguration qc3 = createCoreQueueConfiguration(inOutQueue2); //incoming - HashMap config1 = new HashMap<>(); - config1.put(VertxConstants.HOST, host); - config1.put(VertxConstants.PORT, port); - config1.put(VertxConstants.QUEUE_NAME, incomingQueue1); - config1.put(VertxConstants.VERTX_ADDRESS, incomingVertxAddress1); - - ConnectorServiceConfiguration inconf1 = new ConnectorServiceConfiguration().setFactoryClassName(VertxIncomingConnectorServiceFactory.class.getName()).setParams(config1).setName("test-vertx-incoming-connector1"); + HashMap config1 = createIncomingConnectionConfig(incomingVertxAddress1, incomingQueue1); + ConnectorServiceConfiguration inconf1 = createIncomingConnectorServiceConfiguration(config1, "test-vertx-incoming-connector1"); //outgoing send style - HashMap config2 = new HashMap<>(); - config2.put(VertxConstants.HOST, host); - config2.put(VertxConstants.PORT, port); - config2.put(VertxConstants.QUEUE_NAME, inOutQueue1); - config2.put(VertxConstants.VERTX_ADDRESS, incomingVertxAddress2); + HashMap config2 = createOutgoingConnectionConfig(inOutQueue1, incomingVertxAddress2); + ConnectorServiceConfiguration inconf2 = createIncomingConnectorServiceConfiguration(config2, "test-vertx-incoming-connector2"); - ConnectorServiceConfiguration inconf2 = new ConnectorServiceConfiguration().setFactoryClassName(VertxIncomingConnectorServiceFactory.class.getName()).setParams(config2).setName("test-vertx-incoming-connector2"); - - HashMap config3 = new HashMap<>(); - config3.put(VertxConstants.HOST, host); - config3.put(VertxConstants.PORT, port); - config3.put(VertxConstants.QUEUE_NAME, inOutQueue1); - config3.put(VertxConstants.VERTX_ADDRESS, outgoingVertxAddress1); - - ConnectorServiceConfiguration outconf1 = new ConnectorServiceConfiguration().setFactoryClassName(VertxOutgoingConnectorServiceFactory.class.getName()).setParams(config3).setName("test-vertx-outgoing-connector1"); + HashMap config3 = createOutgoingConnectionConfig(inOutQueue1, outgoingVertxAddress1); + ConnectorServiceConfiguration outconf1 = createOutgoingConnectorServiceConfiguration(config3, "test-vertx-outgoing-connector1"); //outgoing publish style - HashMap config4 = new HashMap<>(); - config4.put(VertxConstants.HOST, host); - config4.put(VertxConstants.PORT, port); - config4.put(VertxConstants.QUEUE_NAME, inOutQueue2); - config4.put(VertxConstants.VERTX_ADDRESS, incomingVertxAddress3); + HashMap config4 = createOutgoingConnectionConfig(inOutQueue2, incomingVertxAddress3); + ConnectorServiceConfiguration inconf3 = createIncomingConnectorServiceConfiguration(config4, "test-vertx-incoming-connector3"); - ConnectorServiceConfiguration inconf3 = new ConnectorServiceConfiguration().setFactoryClassName(VertxIncomingConnectorServiceFactory.class.getName()).setParams(config4).setName("test-vertx-incoming-connector3"); - - HashMap config5 = new HashMap<>(); - config5.put(VertxConstants.HOST, host); - config5.put(VertxConstants.PORT, port); - config5.put(VertxConstants.QUEUE_NAME, inOutQueue2); - config5.put(VertxConstants.VERTX_ADDRESS, outgoingVertxAddress2); + HashMap config5 = createOutgoingConnectionConfig(inOutQueue2, outgoingVertxAddress2); config5.put(VertxConstants.VERTX_PUBLISH, "true"); - - ConnectorServiceConfiguration outconf2 = new ConnectorServiceConfiguration().setFactoryClassName(VertxOutgoingConnectorServiceFactory.class.getName()).setParams(config5).setName("test-vertx-outgoing-connector2"); + ConnectorServiceConfiguration outconf2 = createOutgoingConnectorServiceConfiguration(config5, "test-vertx-outgoing-connector2"); Configuration configuration = createDefaultInVMConfig().addQueueConfiguration(qc1).addQueueConfiguration(qc2).addQueueConfiguration(qc3).addConnectorServiceConfiguration(inconf1).addConnectorServiceConfiguration(inconf2).addConnectorServiceConfiguration(outconf1).addConnectorServiceConfiguration(inconf3).addConnectorServiceConfiguration(outconf2); @@ -140,8 +115,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { /** * (vertx events) ===> (incomingQueue1) ===> (activemq consumer) - * - * @throws Exception */ @Test public void testIncomingEvents() throws Exception { @@ -211,7 +184,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { } //send a byte - Byte aByte = new Byte((byte) 15); + Byte aByte = (byte) 15; vertx.eventBus().send(incomingVertxAddress1, aByte); msg = receiveFromQueue(incomingQueue1); assertNotNull(msg); @@ -222,7 +195,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { assertEquals(aByte, recvByte); //send a Character - Character aChar = new Character('a'); + Character aChar = 'a'; vertx.eventBus().send(incomingVertxAddress1, aChar); msg = receiveFromQueue(incomingQueue1); assertNotNull(msg); @@ -232,7 +205,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { assertEquals(aChar, recvChar); //send a Double - Double aDouble = new Double(1234.56d); + Double aDouble = 1234.56d; vertx.eventBus().send(incomingVertxAddress1, aDouble); msg = receiveFromQueue(incomingQueue1); assertNotNull(msg); @@ -242,7 +215,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { assertEquals(aDouble, recvDouble); //send a Float - Float aFloat = new Float(1234.56f); + Float aFloat = 1234.56f; vertx.eventBus().send(incomingVertxAddress1, aFloat); msg = receiveFromQueue(incomingQueue1); assertNotNull(msg); @@ -252,7 +225,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { assertEquals(aFloat, recvFloat); //send an Integer - Integer aInt = new Integer(1234); + Integer aInt = 1234; vertx.eventBus().send(incomingVertxAddress1, aInt); msg = receiveFromQueue(incomingQueue1); assertNotNull(msg); @@ -262,7 +235,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { assertEquals(aInt, recvInt); //send a Long - Long aLong = new Long(12345678L); + Long aLong = 12345678L; vertx.eventBus().send(incomingVertxAddress1, aLong); msg = receiveFromQueue(incomingQueue1); assertNotNull(msg); @@ -272,7 +245,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { assertEquals(aLong, recvLong); //send a Short - Short aShort = new Short((short) 321); + Short aShort = (short) 321; vertx.eventBus().send(incomingVertxAddress1, aShort); msg = receiveFromQueue(incomingQueue1); assertNotNull(msg); @@ -339,25 +312,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { recvJsonString = msg.getBodyBuffer().readString(); System.out.println("==== received json: " + recvJsonString); assertEquals(aJsonArray, new JsonArray(recvJsonString)); - - //send a ReplyFailure - /* - ReplyFailure replyFailure = ReplyFailure.TIMEOUT; - int fakeFailureCode = 1234; - String failureMsg = "Test failure message"; - ReplyException aReplyEx = new ReplyException(replyFailure, fakeFailureCode, failureMsg); - vertx.eventBus().send(incomingVertxAddress1, aReplyEx); - msg = receiveFromQueue(incomingQueue1); - assertNotNull(msg); - vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE); - assertEquals(VertxConstants.TYPE_REPLY_FAILURE, vertxType); - int recvType = msg.getBodyBuffer().readInt(); - int recvCode = msg.getBodyBuffer().readInt(); - String recvFailureMsg = msg.getBodyBuffer().readString(); - assertEquals(replyFailure.toInt(), recvType); - assertEquals(fakeFailureCode, recvCode); - assertEquals(failureMsg, recvFailureMsg); - */ } /** @@ -365,8 +319,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { * ===> (inOutQueue1) * ===> (outgoing handler) * ===> send to vertx (outgoingVertxAddress1) - * - * @throws Exception */ @Test public void testOutgoingEvents() throws Exception { @@ -402,43 +354,43 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { handler.checkByteArrayMessageReceived(byteArray); //send a byte - Byte aByte = new Byte((byte) 15); + Byte aByte = (byte) 15; vertx.eventBus().send(incomingVertxAddress2, aByte); handler.checkByteMessageReceived(aByte); //send a Character - Character aChar = new Character('a'); + Character aChar = 'a'; vertx.eventBus().send(incomingVertxAddress2, aChar); handler.checkCharacterMessageReceived(aChar); //send a Double - Double aDouble = new Double(1234.56d); + Double aDouble = 1234.56d; vertx.eventBus().send(incomingVertxAddress2, aDouble); handler.checkDoubleMessageReceived(aDouble); //send a Float - Float aFloat = new Float(1234.56f); + Float aFloat = 1234.56f; vertx.eventBus().send(incomingVertxAddress2, aFloat); handler.checkFloatMessageReceived(aFloat); //send an Integer - Integer aInt = new Integer(1234); + Integer aInt = 1234; vertx.eventBus().send(incomingVertxAddress2, aInt); handler.checkIntegerMessageReceived(aInt); //send a Long - Long aLong = new Long(12345678L); + Long aLong = 12345678L; vertx.eventBus().send(incomingVertxAddress2, aLong); handler.checkLongMessageReceived(aLong); //send a Short - Short aShort = new Short((short) 321); + Short aShort = (short) 321; vertx.eventBus().send(incomingVertxAddress2, aShort); handler.checkShortMessageReceived(aShort); @@ -496,8 +448,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { * ===> (inOutQueue2) * ===> (outgoing handler) * ===> public to vertx (outgoingVertxAddress2) - * - * @throws Exception */ @Test public void testOutgoingEvents2() throws Exception { @@ -539,49 +489,49 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { handler2.checkByteArrayMessageReceived(byteArray); //send a byte - Byte aByte = new Byte((byte) 15); + Byte aByte = (byte) 15; vertx.eventBus().send(incomingVertxAddress3, aByte); handler1.checkByteMessageReceived(aByte); handler2.checkByteMessageReceived(aByte); //send a Character - Character aChar = new Character('a'); + Character aChar = 'a'; vertx.eventBus().send(incomingVertxAddress3, aChar); handler1.checkCharacterMessageReceived(aChar); handler2.checkCharacterMessageReceived(aChar); //send a Double - Double aDouble = new Double(1234.56d); + Double aDouble = 1234.56d; vertx.eventBus().send(incomingVertxAddress3, aDouble); handler1.checkDoubleMessageReceived(aDouble); handler2.checkDoubleMessageReceived(aDouble); //send a Float - Float aFloat = new Float(1234.56f); + Float aFloat = 1234.56f; vertx.eventBus().send(incomingVertxAddress3, aFloat); handler1.checkFloatMessageReceived(aFloat); handler2.checkFloatMessageReceived(aFloat); //send an Integer - Integer aInt = new Integer(1234); + Integer aInt = 1234; vertx.eventBus().send(incomingVertxAddress3, aInt); handler1.checkIntegerMessageReceived(aInt); handler2.checkIntegerMessageReceived(aInt); //send a Long - Long aLong = new Long(12345678L); + Long aLong = 12345678L; vertx.eventBus().send(incomingVertxAddress3, aLong); handler1.checkLongMessageReceived(aLong); handler2.checkLongMessageReceived(aLong); //send a Short - Short aShort = new Short((short) 321); + Short aShort = (short) 321; vertx.eventBus().send(incomingVertxAddress3, aShort); handler1.checkShortMessageReceived(aShort); @@ -641,9 +591,7 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { private ClientMessage receiveFromQueue(String queueName) throws Exception { ClientMessage msg = null; - try (ServerLocator locator = createInVMNonHALocator(); - ClientSessionFactory sf = createSessionFactory(locator); - ClientSession session = sf.createSession(false, true, true)) { + try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory sf = createSessionFactory(locator); ClientSession session = sf.createSession(false, true, true)) { ClientConsumer consumer = session.createConsumer(queueName); session.start(); @@ -657,9 +605,6 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { private void createVertxService() { System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName()); vertxManager = PlatformLocator.factory.createPlatformManager(Integer.valueOf(port), host); - - // vertxManager = PlatformLocator.factory.createPlatformManager(Integer.valueOf(port), - // host, quorumSize, haGroup + System.currentTimeMillis()); } private class VertxTestHandler implements Handler> { @@ -675,61 +620,61 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { } } - public void checkJsonArrayMessageReceived(JsonArray aJsonArray) { + void checkJsonArrayMessageReceived(JsonArray aJsonArray) { BaseMessage msg = waitMessage(); JsonArray body = (JsonArray) msg.body(); assertEquals(aJsonArray, body); } - public void checkJsonObjectMessageReceived(final JsonObject aJsonObj) { + void checkJsonObjectMessageReceived(final JsonObject aJsonObj) { BaseMessage msg = waitMessage(); JsonObject body = (JsonObject) msg.body(); assertEquals(aJsonObj, body); } - public void checkShortMessageReceived(final Short aShort) { + void checkShortMessageReceived(final Short aShort) { BaseMessage msg = waitMessage(); Short body = (Short) msg.body(); assertEquals(aShort, body); } - public void checkLongMessageReceived(final Long aLong) { + void checkLongMessageReceived(final Long aLong) { BaseMessage msg = waitMessage(); Long body = (Long) msg.body(); assertEquals(aLong, body); } - public void checkIntegerMessageReceived(final Integer aInt) { + void checkIntegerMessageReceived(final Integer aInt) { BaseMessage msg = waitMessage(); Integer body = (Integer) msg.body(); assertEquals(aInt, body); } - public void checkFloatMessageReceived(final Float aFloat) { + void checkFloatMessageReceived(final Float aFloat) { BaseMessage msg = waitMessage(); Float body = (Float) msg.body(); assertEquals(aFloat, body); } - public void checkDoubleMessageReceived(final Double aDouble) { + void checkDoubleMessageReceived(final Double aDouble) { BaseMessage msg = waitMessage(); Double body = (Double) msg.body(); assertEquals(aDouble, body); } - public void checkCharacterMessageReceived(final Character aChar) { + void checkCharacterMessageReceived(final Character aChar) { BaseMessage msg = waitMessage(); Character body = (Character) msg.body(); assertEquals(aChar, body); } - public void checkByteMessageReceived(final Byte aByte) { + void checkByteMessageReceived(final Byte aByte) { BaseMessage msg = waitMessage(); Byte body = (Byte) msg.body(); assertEquals(aByte, body); } - public void checkByteArrayMessageReceived(final byte[] byteArray) { + void checkByteArrayMessageReceived(final byte[] byteArray) { BaseMessage msg = waitMessage(); byte[] body = (byte[]) msg.body(); assertEquals(byteArray.length, body.length); @@ -738,19 +683,19 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { } } - public void checkBooleanMessageReceived(final Boolean boolValue) { + void checkBooleanMessageReceived(final Boolean boolValue) { BaseMessage msg = waitMessage(); Boolean body = (Boolean) msg.body(); assertEquals(boolValue, body); } - public void checkStringMessageReceived(final String str) { + void checkStringMessageReceived(final String str) { BaseMessage msg = waitMessage(); String body = (String) msg.body(); assertEquals(str, body); } - public void checkBufferMessageReceived(final Buffer buffer) { + void checkBufferMessageReceived(final Buffer buffer) { byte[] source = buffer.getBytes(); BaseMessage msg = waitMessage(); Buffer body = (Buffer) msg.body(); @@ -792,4 +737,36 @@ public class ActiveMQVertxUnitTest extends ActiveMQTestBase { Thread.currentThread().setContextClassLoader(contextClassLoader); super.tearDown(); } + + private CoreQueueConfiguration createCoreQueueConfiguration(String queueName) { + return new CoreQueueConfiguration().setAddress(queueName).setName(queueName); + } + + private ConnectorServiceConfiguration createOutgoingConnectorServiceConfiguration(HashMap config, + String name) { + return new ConnectorServiceConfiguration().setFactoryClassName(VertxOutgoingConnectorServiceFactory.class.getName()).setParams(config).setName(name); + } + + private ConnectorServiceConfiguration createIncomingConnectorServiceConfiguration(HashMap config, + String name) { + return new ConnectorServiceConfiguration().setFactoryClassName(VertxIncomingConnectorServiceFactory.class.getName()).setParams(config).setName(name); + } + + private HashMap createIncomingConnectionConfig(String vertxAddress, String incomingQueue) { + HashMap config1 = new HashMap<>(); + config1.put(VertxConstants.HOST, host); + config1.put(VertxConstants.PORT, port); + config1.put(VertxConstants.VERTX_ADDRESS, vertxAddress); + config1.put(VertxConstants.QUEUE_NAME, incomingQueue); + return config1; + } + + private HashMap createOutgoingConnectionConfig(String queueName, String vertxAddress) { + HashMap config1 = new HashMap<>(); + config1.put(VertxConstants.HOST, host); + config1.put(VertxConstants.PORT, port); + config1.put(VertxConstants.QUEUE_NAME, queueName); + config1.put(VertxConstants.VERTX_ADDRESS, vertxAddress); + return config1; + } }