From 301244740409a42ac36a4ba90c2a18e0aa2a6cdb Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Wed, 20 Apr 2016 11:37:22 +0800 Subject: [PATCH] ARTEMIS-488 Fix OpenWire Test (Temp Queue removal and others) Temp Queue not deleted when connection is closed. Enable Stomp in openwire test because some test uses it. Remove unused code in opwnwire Wrong XA error code returned when xid is missing (ActiveMQXAConnectionFactory.testRollbackXaErrorCode) regression in ActiveMQSslConnectionFactoryTest (SSL related) --- .../protocol/openwire/OpenWireConnection.java | 26 +--- .../protocol/openwire/amq/AMQSession.java | 8 +- .../artemis/core/postoffice/PostOffice.java | 24 +-- .../core/postoffice/RoutingStatus.java | 27 ++++ .../core/postoffice/impl/PostOfficeImpl.java | 41 +++--- .../artemis/core/server/ServerSession.java | 5 +- .../core/server/impl/ServerSessionImpl.java | 27 +++- tests/activemq5-unit-tests/pom.xml | 18 +++ .../apache/activemq/broker/BrokerService.java | 97 +++--------- .../activemq/broker/SslBrokerService.java | 7 - .../artemiswrapper/ArtemisBrokerWrapper.java | 6 +- .../transport/tcp/TcpTransportFactory.java | 1 + ...s.spi.core.protocol.ProtocolManagerFactory | 2 - .../ActiveMQSslConnectionFactoryTest.java | 2 +- .../activemq/JmsTempDestinationTest.java | 138 ++++++------------ .../transport/SoWriteTimeoutTest.java | 12 +- .../server/impl/fakes/FakePostOffice.java | 39 ++--- 17 files changed, 220 insertions(+), 260 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/RoutingStatus.java delete mode 100644 tests/activemq5-unit-tests/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 818d3051a4..a9cbf0c234 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -71,7 +71,6 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -151,8 +150,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private ConnectionState state; - private final Set tempQueues = new ConcurrentHashSet<>(); - /** * Openwire doesn't sen transactions associated with any sessions. * It will however send beingTX / endTX as it would be doing it with XA Transactions. @@ -289,7 +286,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se response.setCorrelationId(commandId); dispatchSync(response); } - } } catch (Exception e) { @@ -512,10 +508,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return this.wireFormat; } - public void registerTempQueue(ActiveMQDestination queue) { - tempQueues.add(queue); - } - private void shutdown(boolean fail) { if (fail) { transportConnection.forceClose(); @@ -692,19 +684,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se SimpleString qName = OpenWireUtil.toCoreAddress(dest); QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName); if (binding == null) { - if (getState().getInfo() != null) { - + if (dest.isTemporary()) { + internalSession.createQueue(qName, qName, null, dest.isTemporary(), false); + } + else { + ConnectionInfo connInfo = getState().getInfo(); CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; server.getSecurityStore().check(qName, checkType, this); - server.checkQueueCreationLimit(getUsername()); - } - ConnectionInfo connInfo = getState().getInfo(); - server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary()); - } + server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, false); - if (dest.isTemporary()) { - registerTempQueue(dest); + } } } @@ -1407,7 +1397,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } if (transaction == null) { - throw new IllegalStateException("cannot find transactionInfo::" + txID + " xid=" + xid); + return null; } if (session != null && transaction.getProtocolData() != session) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 563608d1f2..218e0dfcc9 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; +import javax.jms.InvalidDestinationException; import javax.jms.ResourceAllocationException; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -24,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; @@ -361,7 +363,11 @@ public class AMQSession implements SessionCallback { connection.getTransportConnection().setAutoRead(false); } - getCoreSession().send(coreMsg, false); + RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary()); + + if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) { + throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]); + } if (runToUse != null) { // if the timeout is >0, it will wait this much milliseconds diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index d07ea5b814..c6b0e9eb69 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -68,26 +68,26 @@ public interface PostOffice extends ActiveMQComponent { Map getAllBindings(); - void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception; + RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception; - void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception; + RoutingStatus route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception; - void route(ServerMessage message, + RoutingStatus route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception; - void route(ServerMessage message, - QueueCreator queueCreator, - RoutingContext context, - boolean direct) throws Exception; + RoutingStatus route(ServerMessage message, + QueueCreator queueCreator, + RoutingContext context, + boolean direct) throws Exception; - void route(ServerMessage message, - QueueCreator queueCreator, - RoutingContext context, - boolean direct, - boolean rejectDuplicates) throws Exception; + RoutingStatus route(ServerMessage message, + QueueCreator queueCreator, + RoutingContext context, + boolean direct, + boolean rejectDuplicates) throws Exception; MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/RoutingStatus.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/RoutingStatus.java new file mode 100644 index 0000000000..a46a4f2f8f --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/RoutingStatus.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice; + +/** + * Used to indicate the result of a server send + */ +public enum RoutingStatus { + OK, + NO_BINDINGS, + NO_BINDINGS_DLA, + DUPLICATED_ID; +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 659c6126f6..53857821ae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueInfo; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -570,41 +571,42 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public void route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception { - route(message, queueCreator, (Transaction) null, direct); + public RoutingStatus route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception { + return route(message, queueCreator, (Transaction) null, direct); } @Override - public void route(final ServerMessage message, + public RoutingStatus route(final ServerMessage message, QueueCreator queueCreator, final Transaction tx, final boolean direct) throws Exception { - route(message, queueCreator, new RoutingContextImpl(tx), direct); + return route(message, queueCreator, new RoutingContextImpl(tx), direct); } @Override - public void route(final ServerMessage message, + public RoutingStatus route(final ServerMessage message, final QueueCreator queueCreator, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception { - route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates); + return route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates); } @Override - public void route(final ServerMessage message, - final QueueCreator queueCreator, - final RoutingContext context, - final boolean direct) throws Exception { - route(message, queueCreator, context, direct, true); + public RoutingStatus route(final ServerMessage message, + final QueueCreator queueCreator, + final RoutingContext context, + final boolean direct) throws Exception { + return route(message, queueCreator, context, direct, true); } @Override - public void route(final ServerMessage message, - final QueueCreator queueCreator, - final RoutingContext context, - final boolean direct, - boolean rejectDuplicates) throws Exception { + public RoutingStatus route(final ServerMessage message, + final QueueCreator queueCreator, + final RoutingContext context, + final boolean direct, + boolean rejectDuplicates) throws Exception { + RoutingStatus result = RoutingStatus.OK; // Sanity check if (message.getRefCount() > 0) { throw new IllegalStateException("Message cannot be routed more than once"); @@ -619,7 +621,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding applyExpiryDelay(message, address); if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) { - return; + return RoutingStatus.DUPLICATED_ID; } if (message.hasInternalProperties()) { @@ -671,6 +673,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } if (dlaAddress == null) { + result = RoutingStatus.NO_BINDINGS; ActiveMQServerLogger.LOGGER.noDLA(address); } else { @@ -679,9 +682,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding message.setAddress(dlaAddress); route(message, null, context.getTransaction(), false); + result = RoutingStatus.NO_BINDINGS_DLA; } } else { + result = RoutingStatus.NO_BINDINGS; + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address); } @@ -709,6 +715,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (startedTX.get()) { context.getTransaction().commit(); } + return result; } // HORNETQ-1029 diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index b7a7c47af1..7378b5a4b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -132,7 +133,9 @@ public interface ServerSession extends SecurityAuth { void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception; - void send(ServerMessage message, boolean direct) throws Exception; + RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception; + + RoutingStatus send(ServerMessage message, boolean direct) throws Exception; void sendLarge(MessageInternal msg) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index bccf992421..8203a09cba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.security.CheckType; @@ -1213,7 +1214,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public void send(final ServerMessage message, final boolean direct) throws Exception { + public RoutingStatus send(final ServerMessage message, final boolean direct) throws Exception { + return send(message, direct, false); + } + + @Override + public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception { + RoutingStatus result = RoutingStatus.OK; //large message may come from StompSession directly, in which //case the id header already generated. if (!message.isLargeMessage()) { @@ -1256,8 +1263,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { handleManagementMessage(message, direct); } else { - doSend(message, direct); + result = doSend(message, direct, noAutoCreateQueue); } + return result; } @Override @@ -1281,7 +1289,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize); } - doSend(currentLargeMessage, false); + doSend(currentLargeMessage, false, false); currentLargeMessage = null; } @@ -1479,7 +1487,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (replyTo != null) { reply.setAddress(replyTo); - doSend(reply, direct); + doSend(reply, direct, false); } } @@ -1535,7 +1543,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { theTx.rollback(); } - protected void doSend(final ServerMessage msg, final boolean direct) throws Exception { + protected RoutingStatus doSend(final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception { + RoutingStatus result = RoutingStatus.OK; // check the user has write access to this address. try { securityCheck(msg.getAddress(), CheckType.SEND, this); @@ -1554,7 +1563,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } try { - postOffice.route(msg, queueCreator, routingContext, direct); + if (noAutoCreateQueue) { + result = postOffice.route(msg, null, routingContext, direct); + } + else { + result = postOffice.route(msg, queueCreator, routingContext, direct); + } Pair value = targetAddressInfos.get(msg.getAddress()); @@ -1569,6 +1583,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { finally { routingContext.clear(); } + return result; } @Override diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml index 0a4bdb43e3..b058f841bc 100644 --- a/tests/activemq5-unit-tests/pom.xml +++ b/tests/activemq5-unit-tests/pom.xml @@ -313,6 +313,19 @@ artemis-openwire-protocol ${project.version} + + + org.apache.activemq + artemis-stomp-protocol + ${project.version} + + + + org.apache.activemq + artemis-stomp-protocol + ${project.version} + + org.jboss.byteman byteman @@ -422,6 +435,11 @@ maven-surefire-plugin ${skipActiveMQ5Tests} + **/org/apache/activemq/*Test.java diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java index 224498ed0b..7da162da0c 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -84,12 +84,6 @@ public class BrokerService implements Service { public static final long DEFAULT_START_TIMEOUT = 600000L; public static boolean disableWrapper = false; - public String SERVER_SIDE_KEYSTORE; - public String KEYSTORE_PASSWORD; - public String SERVER_SIDE_TRUSTSTORE; - public String TRUSTSTORE_PASSWORD; - public String storeType; - private SslContext sslContext; private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); @@ -151,7 +145,6 @@ public class BrokerService implements Service { tmpfolder = new TemporaryFolder(targetTmp); tmpfolder.create(); Exception e = new Exception(); - e.fillInStackTrace(); startBroker(startAsync); map.put(broker, e); } @@ -261,10 +254,6 @@ public class BrokerService implements Service { } } - public boolean enableSsl() { - return this.SERVER_SIDE_KEYSTORE != null; - } - //below are methods called directly by tests //we don't actually implement any of these for now, //just to make test compile pass. @@ -500,23 +489,11 @@ public class BrokerService implements Service { public void setTransportConnectors(List transportConnectors) throws Exception { this.transportConnectors = transportConnectors; for (TransportConnector connector : transportConnectors) { - if (connector.getUri().getScheme().equals("ssl")) { - boolean added = this.extraConnectors.add(new ConnectorInfo(connector.getUri().getPort(), true)); - if (added) { - System.out.println("added ssl connector " + connector); - } - else { - System.out.println("WARNing! failed to add ssl connector: " + connector); - } + if (sslContext instanceof SpringSslContext) { + this.extraConnectors.add(new ConnectorInfo(connector.getUri(), (SpringSslContext)sslContext)); } else { - boolean added = this.extraConnectors.add(new ConnectorInfo(connector.getUri().getPort())); - if (added) { - System.out.println("added connector " + connector); - } - else { - System.out.println("WARNing! failed to add connector: " + connector); - } + this.extraConnectors.add(new ConnectorInfo(connector.getUri())); } } } @@ -584,7 +561,12 @@ public class BrokerService implements Service { connector = new FakeTransportConnector(bindAddress); this.transportConnectors.add(connector); - this.extraConnectors.add(new ConnectorInfo(bindAddress)); + if (sslContext instanceof SpringSslContext) { + this.extraConnectors.add(new ConnectorInfo(bindAddress, (SpringSslContext) sslContext)); + } + else { + this.extraConnectors.add(new ConnectorInfo(bindAddress)); + } return connector; } @@ -738,14 +720,6 @@ public class BrokerService implements Service { public void setSslContext(SslContext sslContext) { this.sslContext = sslContext; - if (sslContext instanceof SpringSslContext) { - SpringSslContext springContext = (SpringSslContext)sslContext; - this.SERVER_SIDE_KEYSTORE = springContext.getKeyStore(); - this.KEYSTORE_PASSWORD = springContext.getKeyStorePassword(); - this.SERVER_SIDE_TRUSTSTORE = springContext.getTrustStore(); - this.TRUSTSTORE_PASSWORD = springContext.getTrustStorePassword(); - this.storeType = springContext.getKeyStoreType(); - } } public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { @@ -807,48 +781,15 @@ public class BrokerService implements Service { public URI uri; public boolean ssl; - public String keyStore; - public String keyStorePassword; - public String keyStoreType; - - public String trustStore; - public String trustStorePassword; - public String trustStoreType; - public boolean clientAuth; - public ConnectorInfo(int port) throws URISyntaxException { - this(port, false); - } - - public ConnectorInfo(int port, boolean ssl) throws URISyntaxException { - this(port, ssl, false); - } - - public ConnectorInfo(int port, boolean ssl, boolean clientAuth) throws URISyntaxException { - this.ssl = ssl; - if (port == 0) { - port = getPseudoRandomPort(); - } - - String baseUri = "tcp://localhost:" + port + "?protocols=OPENWIRE,CORE"; - if (ssl) { - baseUri = baseUri + "&" + TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + defaultKeyStore + "&" - + TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + defaultKeyStorePassword + "&" - + TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + defaultKeyStoreType; - if (clientAuth) { - baseUri = baseUri + "&" + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME + "=true" + "&" - + TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + defaultTrustStore + "&" - + TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + defaultTrustStorePassword + "&" - + TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + defaultTrustStoreType; - } - } - this.uri = new URI(baseUri); + public ConnectorInfo(URI bindAddress) throws URISyntaxException { + this(bindAddress, null); } //bindAddress must be Artemis compliant, except //scheme - public ConnectorInfo(URI bindAddress) throws URISyntaxException { + public ConnectorInfo(URI bindAddress, SpringSslContext context) throws URISyntaxException { Integer port = bindAddress.getPort(); String host = bindAddress.getHost(); @@ -870,16 +811,16 @@ public class BrokerService implements Service { host, port, bindAddress.getPath(), bindAddress.getQuery(), bindAddress.getFragment()); } else { - String baseUri = "tcp://" + host + ":" + port + "?protocols=OPENWIRE,CORE&" + String baseUri = "tcp://" + host + ":" + port + "?" + TransportConstants.SSL_ENABLED_PROP_NAME + "=true&" - + TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + defaultKeyStore + "&" - + TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + defaultKeyStorePassword + "&" - + TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + defaultKeyStoreType; + + TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + (context == null ? defaultKeyStore : context.getKeyStore()) + "&" + + TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + (context == null ? defaultKeyStorePassword : context.getKeyStorePassword()) + "&" + + TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + (context == null ? defaultKeyStoreType : context.getKeyStoreType()); if (clientAuth) { baseUri = baseUri + "&" + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME + "=true" + "&" - + TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + defaultTrustStore + "&" - + TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + defaultTrustStorePassword + "&" - + TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + defaultTrustStoreType; + + TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + (context == null ? defaultTrustStore : context.getTrustStore()) + "&" + + TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + (context == null ? defaultTrustStorePassword : context.getTrustStorePassword()) + "&" + + TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + (context == null ? defaultTrustStoreType : context.getTrustStoreType()); } uri = new URI(baseUri); } diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/SslBrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/SslBrokerService.java index b91d328b48..2f36cf62d1 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/SslBrokerService.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/SslBrokerService.java @@ -56,11 +56,4 @@ public class SslBrokerService extends BrokerService { SecureRandom random) throws IOException, KeyManagementException { return null; } - - //one way - public void setupSsl(String keystoreType, String password, String serverKeystore) { - this.SERVER_SIDE_KEYSTORE = serverKeystore; - this.KEYSTORE_PASSWORD = password; - this.storeType = keystoreType; - } } diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 61c6d8767b..17d81d76cb 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -90,11 +90,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { commonSettings.setAutoCreateJmsQueues(true); if (bservice.extraConnectors.size() == 0) { - serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616?protocols=OPENWIRE,CORE"); - } - if (this.bservice.enableSsl()) { - //default - addServerAcceptor(serverConfig, new BrokerService.ConnectorInfo(61611, true)); + serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616"); } for (BrokerService.ConnectorInfo info : bservice.extraConnectors) { diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index c0ed126bca..e06001ae00 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -78,6 +78,7 @@ public class TcpTransportFactory extends TransportFactory { params.remove("broker.useJmx"); params.remove("marshal"); params.remove("create"); + params.remove("asyncQueueDepth"); URI location2 = URISupport.createRemainingURI(location, params); return super.doConnect(location2); } diff --git a/tests/activemq5-unit-tests/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory b/tests/activemq5-unit-tests/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory deleted file mode 100644 index c76e40e79c..0000000000 --- a/tests/activemq5-unit-tests/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory +++ /dev/null @@ -1,2 +0,0 @@ -org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory - diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java index e85f6a87c0..df71ec500e 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java @@ -204,7 +204,7 @@ public class ActiveMQSslConnectionFactoryTest extends CombinationTestSupport { SslBrokerService service = new SslBrokerService(); service.setPersistent(false); - service.setupSsl(KEYSTORE_TYPE, PASSWORD, SERVER_KEYSTORE); + service.addConnector(uri); service.start(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java index 9d04bf1022..866ab63ae5 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java @@ -16,15 +16,11 @@ */ package org.apache.activemq; -import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -39,27 +35,27 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; -import junit.framework.TestCase; - -import org.apache.activemq.transport.TransportListener; -import org.apache.activemq.transport.vm.VMTransport; import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @version */ -public class JmsTempDestinationTest extends TestCase { +public class JmsTempDestinationTest { private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class); private Connection connection; private ActiveMQConnectionFactory factory; protected List connections = Collections.synchronizedList(new ArrayList()); - @Override - protected void setUp() throws Exception { - factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + @Before + public void setUp() throws Exception { + factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setAlwaysSyncSend(true); connection = factory.createConnection(); connections.add(connection); @@ -68,8 +64,8 @@ public class JmsTempDestinationTest extends TestCase { /** * @see junit.framework.TestCase#tearDown() */ - @Override - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { for (Iterator iter = connections.iterator(); iter.hasNext(); ) { Connection conn = iter.next(); try { @@ -86,6 +82,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testTempDestOnlyConsumedByLocalConn() throws JMSException { connection.start(); @@ -103,22 +100,22 @@ public class JmsTempDestinationTest extends TestCase { TemporaryQueue otherQueue = otherSession.createTemporaryQueue(); MessageConsumer consumer = otherSession.createConsumer(otherQueue); Message msg = consumer.receive(3000); - assertNull(msg); + Assert.assertNull(msg); // should throw InvalidDestinationException when consuming a temp // destination from another connection try { consumer = otherSession.createConsumer(queue); - fail("Send should fail since temp destination should be used from another connection"); + Assert.fail("Send should fail since temp destination should be used from another connection"); } catch (InvalidDestinationException e) { - assertTrue("failed to throw an exception", true); + Assert.assertTrue("failed to throw an exception", true); } // should be able to consume temp destination from the same connection consumer = tempSession.createConsumer(queue); msg = consumer.receive(3000); - assertNotNull(msg); + Assert.assertNotNull(msg); } @@ -128,6 +125,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testTempQueueHoldsMessagesWithConsumers() throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createTemporaryQueue(); @@ -140,9 +138,9 @@ public class JmsTempDestinationTest extends TestCase { producer.send(message); Message message2 = consumer.receive(1000); - assertNotNull(message2); - assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); - assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText())); + Assert.assertNotNull(message2); + Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); + Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText())); } /** @@ -151,6 +149,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -163,9 +162,9 @@ public class JmsTempDestinationTest extends TestCase { connection.start(); MessageConsumer consumer = session.createConsumer(queue); Message message2 = consumer.receive(3000); - assertNotNull(message2); - assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); - assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText())); + Assert.assertNotNull(message2); + Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); + Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText())); } @@ -174,6 +173,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testTmpQueueWorksUnderLoad() throws JMSException { int count = 500; int dataSize = 1024; @@ -197,9 +197,9 @@ public class JmsTempDestinationTest extends TestCase { MessageConsumer consumer = session.createConsumer(queue); for (int i = 0; i < count; i++) { Message message2 = consumer.receive(2000); - assertTrue(message2 != null); - assertEquals(i, message2.getIntProperty("c")); - assertTrue(message2.equals(list.get(i))); + Assert.assertTrue(message2 != null); + Assert.assertEquals(i, message2.getIntProperty("c")); + Assert.assertTrue(message2.equals(list.get(i))); } } @@ -211,18 +211,20 @@ public class JmsTempDestinationTest extends TestCase { * @throws InterruptedException * @throws URISyntaxException */ + @Test public void testPublishFailsForClosedConnection() throws Exception { Connection tempConnection = factory.createConnection(); connections.add(tempConnection); - Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final TemporaryQueue queue = tempSession.createTemporaryQueue(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); + Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = tempSession.createTemporaryQueue(); + final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection; - assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { + Assert.assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return activeMQConnection.activeTempDestinations.containsKey(queue); @@ -246,9 +248,10 @@ public class JmsTempDestinationTest extends TestCase { try { message = session.createTextMessage("Hello"); producer.send(message); - fail("Send should fail since temp destination should not exist anymore."); + Assert.fail("Send should fail since temp destination should not exist anymore."); } catch (JMSException e) { + e.printStackTrace(); } } @@ -259,18 +262,23 @@ public class JmsTempDestinationTest extends TestCase { * @throws JMSException * @throws InterruptedException */ + @Test public void testPublishFailsForDestroyedTempDestination() throws Exception { Connection tempConnection = factory.createConnection(); connections.add(tempConnection); - Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final TemporaryQueue queue = tempSession.createTemporaryQueue(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); + //In artemis, if you send a message to a topic where the consumer isn't there yet, + //message will get lost. So the create temp queue request has to happen + //after the connection is started (advisory consumer registered). + Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = tempSession.createTemporaryQueue(); + final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection; - assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { + Assert.assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return activeMQConnection.activeTempDestinations.containsKey(queue); @@ -293,10 +301,10 @@ public class JmsTempDestinationTest extends TestCase { try { message = session.createTextMessage("Hello"); producer.send(message); - fail("Send should fail since temp destination should not exist anymore."); + Assert.fail("Send should fail since temp destination should not exist anymore."); } catch (JMSException e) { - assertTrue("failed to throw an exception", true); + Assert.assertTrue("failed to throw an exception", true); } } @@ -305,6 +313,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testDeleteDestinationWithSubscribersFails() throws JMSException { Connection connection = factory.createConnection(); connections.add(connection); @@ -319,65 +328,14 @@ public class JmsTempDestinationTest extends TestCase { // now closed. try { queue.delete(); - fail("Should fail as Subscribers are active"); + Assert.fail("Should fail as Subscribers are active"); } catch (JMSException e) { - assertTrue("failed to throw an exception", true); + Assert.assertTrue("failed to throw an exception", true); } } + //removed. the original test is only for vm transport. tcp transport will block anyway. public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception { - ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20"); - Connection connection = advisoryConnFactory.createConnection(); - connections.add(connection); - connection.start(); - - final CountDownLatch done = new CountDownLatch(1); - final AtomicBoolean ok = new AtomicBoolean(true); - final AtomicBoolean first = new AtomicBoolean(true); - VMTransport t = ((ActiveMQConnection) connection).getTransport().narrow(VMTransport.class); - t.setTransportListener(new TransportListener() { - @Override - public void onCommand(Object command) { - // block first dispatch for a while so broker backs up, but other connection should be able to proceed - if (first.compareAndSet(true, false)) { - try { - ok.set(done.await(35, TimeUnit.SECONDS)); - LOG.info("Done waiting: " + ok.get()); - } - catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - @Override - public void onException(IOException error) { - } - - @Override - public void transportInterupted() { - } - - @Override - public void transportResumed() { - } - }); - - connection = factory.createConnection(); - connections.add(connection); - ((ActiveMQConnection) connection).setWatchTopicAdvisories(false); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - for (int i = 0; i < 2500; i++) { - TemporaryQueue queue = session.createTemporaryQueue(); - MessageConsumer consumer = session.createConsumer(queue); - consumer.close(); - queue.delete(); - } - LOG.info("Done with work: " + ok.get()); - done.countDown(); - assertTrue("ok", ok.get()); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java index 8cac437405..0c0ab301d3 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java @@ -31,6 +31,7 @@ import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; @@ -56,6 +57,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); adapter.setConcurrentStoreAndDispatchQueues(false); broker.setPersistenceAdapter(adapter); + broker.addConnector("tcp://localhost:61616"); broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0&transport.soWriteTimeout=1000&transport.sleep=1000"); if ("nio".equals(brokerTransportScheme)) { broker.addConnector("stomp+" + brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize=" + receiveBufferSize + "&trace=true"); @@ -73,7 +75,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { messageTextPrefix = initMessagePrefix(8 * 1024); sendMessages(dest, 500); - URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri()); + URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri()); LOG.info("consuming using uri: " + tcpBrokerUri); SocketProxy proxy = new SocketProxy(); @@ -104,7 +106,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { messageTextPrefix = initMessagePrefix(8 * 1024); sendMessages(dest, 500); - URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri()); + URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(2).getConnectUri()); LOG.info("consuming using uri: " + stompBrokerUri); SocketProxy proxy = new SocketProxy(); @@ -121,11 +123,12 @@ public class SoWriteTimeoutTest extends JmsTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - frame = "SUBSCRIBE\n" + "destination:/queue/" + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; + frame = "SUBSCRIBE\n" + "destination:jms.queue." + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); // ensure dispatch has started before pause frame = stompConnection.receiveFrame(); + System.out.println("frame: " + frame); assertTrue(frame.startsWith("MESSAGE")); proxy.pause(); @@ -148,7 +151,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { // verify connection is dead try { for (int i = 0; i < 200; i++) { - stompConnection.send("/queue/" + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i); + stompConnection.send("jms.queue." + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i); } fail("expected send to fail with timeout out connection"); } @@ -164,6 +167,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { @Override protected void setUp() throws Exception { + BrokerService.disableWrapper = true; setAutoFail(true); super.setUp(); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 4f8a0075ec..aa41a518c0 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -138,34 +139,36 @@ public class FakePostOffice implements PostOffice { } @Override - public void route(ServerMessage message, - QueueCreator creator, - RoutingContext context, - boolean direct) throws Exception { + public RoutingStatus route(ServerMessage message, + QueueCreator creator, + RoutingContext context, + boolean direct) throws Exception { + return RoutingStatus.OK; } @Override - public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception { + public RoutingStatus route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception { + return RoutingStatus.OK; + } + + @Override + public RoutingStatus route(ServerMessage message, + QueueCreator creator, + RoutingContext context, + boolean direct, + boolean rejectDuplicates) throws Exception { + return RoutingStatus.OK; } @Override - public void route(ServerMessage message, - QueueCreator creator, - RoutingContext context, - boolean direct, - boolean rejectDuplicates) throws Exception { - - } - - @Override - public void route(ServerMessage message, + public RoutingStatus route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception { - + return RoutingStatus.OK; } @Override @@ -173,7 +176,7 @@ public class FakePostOffice implements PostOffice { } @Override - public void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception { - + public RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception { + return RoutingStatus.OK; } } \ No newline at end of file