diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 818d3051a4..a9cbf0c234 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -71,7 +71,6 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -151,8 +150,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private ConnectionState state; - private final Set tempQueues = new ConcurrentHashSet<>(); - /** * Openwire doesn't sen transactions associated with any sessions. * It will however send beingTX / endTX as it would be doing it with XA Transactions. @@ -289,7 +286,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se response.setCorrelationId(commandId); dispatchSync(response); } - } } catch (Exception e) { @@ -512,10 +508,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return this.wireFormat; } - public void registerTempQueue(ActiveMQDestination queue) { - tempQueues.add(queue); - } - private void shutdown(boolean fail) { if (fail) { transportConnection.forceClose(); @@ -692,19 +684,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se SimpleString qName = OpenWireUtil.toCoreAddress(dest); QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName); if (binding == null) { - if (getState().getInfo() != null) { - + if (dest.isTemporary()) { + internalSession.createQueue(qName, qName, null, dest.isTemporary(), false); + } + else { + ConnectionInfo connInfo = getState().getInfo(); CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; server.getSecurityStore().check(qName, checkType, this); - server.checkQueueCreationLimit(getUsername()); - } - ConnectionInfo connInfo = getState().getInfo(); - server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary()); - } + server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, false); - if (dest.isTemporary()) { - registerTempQueue(dest); + } } } @@ -1407,7 +1397,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } if (transaction == null) { - throw new IllegalStateException("cannot find transactionInfo::" + txID + " xid=" + xid); + return null; } if (session != null && transaction.getProtocolData() != session) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 563608d1f2..218e0dfcc9 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; +import javax.jms.InvalidDestinationException; import javax.jms.ResourceAllocationException; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -24,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; @@ -361,7 +363,11 @@ public class AMQSession implements SessionCallback { connection.getTransportConnection().setAutoRead(false); } - getCoreSession().send(coreMsg, false); + RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary()); + + if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) { + throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]); + } if (runToUse != null) { // if the timeout is >0, it will wait this much milliseconds diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index d07ea5b814..c6b0e9eb69 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -68,26 +68,26 @@ public interface PostOffice extends ActiveMQComponent { Map getAllBindings(); - void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception; + RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception; - void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception; + RoutingStatus route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception; - void route(ServerMessage message, + RoutingStatus route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception; - void route(ServerMessage message, - QueueCreator queueCreator, - RoutingContext context, - boolean direct) throws Exception; + RoutingStatus route(ServerMessage message, + QueueCreator queueCreator, + RoutingContext context, + boolean direct) throws Exception; - void route(ServerMessage message, - QueueCreator queueCreator, - RoutingContext context, - boolean direct, - boolean rejectDuplicates) throws Exception; + RoutingStatus route(ServerMessage message, + QueueCreator queueCreator, + RoutingContext context, + boolean direct, + boolean rejectDuplicates) throws Exception; MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/RoutingStatus.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/RoutingStatus.java new file mode 100644 index 0000000000..a46a4f2f8f --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/RoutingStatus.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice; + +/** + * Used to indicate the result of a server send + */ +public enum RoutingStatus { + OK, + NO_BINDINGS, + NO_BINDINGS_DLA, + DUPLICATED_ID; +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 659c6126f6..53857821ae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueInfo; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -570,41 +571,42 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public void route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception { - route(message, queueCreator, (Transaction) null, direct); + public RoutingStatus route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception { + return route(message, queueCreator, (Transaction) null, direct); } @Override - public void route(final ServerMessage message, + public RoutingStatus route(final ServerMessage message, QueueCreator queueCreator, final Transaction tx, final boolean direct) throws Exception { - route(message, queueCreator, new RoutingContextImpl(tx), direct); + return route(message, queueCreator, new RoutingContextImpl(tx), direct); } @Override - public void route(final ServerMessage message, + public RoutingStatus route(final ServerMessage message, final QueueCreator queueCreator, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception { - route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates); + return route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates); } @Override - public void route(final ServerMessage message, - final QueueCreator queueCreator, - final RoutingContext context, - final boolean direct) throws Exception { - route(message, queueCreator, context, direct, true); + public RoutingStatus route(final ServerMessage message, + final QueueCreator queueCreator, + final RoutingContext context, + final boolean direct) throws Exception { + return route(message, queueCreator, context, direct, true); } @Override - public void route(final ServerMessage message, - final QueueCreator queueCreator, - final RoutingContext context, - final boolean direct, - boolean rejectDuplicates) throws Exception { + public RoutingStatus route(final ServerMessage message, + final QueueCreator queueCreator, + final RoutingContext context, + final boolean direct, + boolean rejectDuplicates) throws Exception { + RoutingStatus result = RoutingStatus.OK; // Sanity check if (message.getRefCount() > 0) { throw new IllegalStateException("Message cannot be routed more than once"); @@ -619,7 +621,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding applyExpiryDelay(message, address); if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) { - return; + return RoutingStatus.DUPLICATED_ID; } if (message.hasInternalProperties()) { @@ -671,6 +673,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } if (dlaAddress == null) { + result = RoutingStatus.NO_BINDINGS; ActiveMQServerLogger.LOGGER.noDLA(address); } else { @@ -679,9 +682,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding message.setAddress(dlaAddress); route(message, null, context.getTransaction(), false); + result = RoutingStatus.NO_BINDINGS_DLA; } } else { + result = RoutingStatus.NO_BINDINGS; + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address); } @@ -709,6 +715,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (startedTX.get()) { context.getTransaction().commit(); } + return result; } // HORNETQ-1029 diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index b7a7c47af1..7378b5a4b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -132,7 +133,9 @@ public interface ServerSession extends SecurityAuth { void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception; - void send(ServerMessage message, boolean direct) throws Exception; + RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception; + + RoutingStatus send(ServerMessage message, boolean direct) throws Exception; void sendLarge(MessageInternal msg) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index bccf992421..8203a09cba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.security.CheckType; @@ -1213,7 +1214,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public void send(final ServerMessage message, final boolean direct) throws Exception { + public RoutingStatus send(final ServerMessage message, final boolean direct) throws Exception { + return send(message, direct, false); + } + + @Override + public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception { + RoutingStatus result = RoutingStatus.OK; //large message may come from StompSession directly, in which //case the id header already generated. if (!message.isLargeMessage()) { @@ -1256,8 +1263,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { handleManagementMessage(message, direct); } else { - doSend(message, direct); + result = doSend(message, direct, noAutoCreateQueue); } + return result; } @Override @@ -1281,7 +1289,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize); } - doSend(currentLargeMessage, false); + doSend(currentLargeMessage, false, false); currentLargeMessage = null; } @@ -1479,7 +1487,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (replyTo != null) { reply.setAddress(replyTo); - doSend(reply, direct); + doSend(reply, direct, false); } } @@ -1535,7 +1543,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { theTx.rollback(); } - protected void doSend(final ServerMessage msg, final boolean direct) throws Exception { + protected RoutingStatus doSend(final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception { + RoutingStatus result = RoutingStatus.OK; // check the user has write access to this address. try { securityCheck(msg.getAddress(), CheckType.SEND, this); @@ -1554,7 +1563,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } try { - postOffice.route(msg, queueCreator, routingContext, direct); + if (noAutoCreateQueue) { + result = postOffice.route(msg, null, routingContext, direct); + } + else { + result = postOffice.route(msg, queueCreator, routingContext, direct); + } Pair value = targetAddressInfos.get(msg.getAddress()); @@ -1569,6 +1583,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { finally { routingContext.clear(); } + return result; } @Override diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml index 0a4bdb43e3..b058f841bc 100644 --- a/tests/activemq5-unit-tests/pom.xml +++ b/tests/activemq5-unit-tests/pom.xml @@ -313,6 +313,19 @@ artemis-openwire-protocol ${project.version} + + + org.apache.activemq + artemis-stomp-protocol + ${project.version} + + + + org.apache.activemq + artemis-stomp-protocol + ${project.version} + + org.jboss.byteman byteman @@ -422,6 +435,11 @@ maven-surefire-plugin ${skipActiveMQ5Tests} + **/org/apache/activemq/*Test.java diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java index 224498ed0b..7da162da0c 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -84,12 +84,6 @@ public class BrokerService implements Service { public static final long DEFAULT_START_TIMEOUT = 600000L; public static boolean disableWrapper = false; - public String SERVER_SIDE_KEYSTORE; - public String KEYSTORE_PASSWORD; - public String SERVER_SIDE_TRUSTSTORE; - public String TRUSTSTORE_PASSWORD; - public String storeType; - private SslContext sslContext; private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); @@ -151,7 +145,6 @@ public class BrokerService implements Service { tmpfolder = new TemporaryFolder(targetTmp); tmpfolder.create(); Exception e = new Exception(); - e.fillInStackTrace(); startBroker(startAsync); map.put(broker, e); } @@ -261,10 +254,6 @@ public class BrokerService implements Service { } } - public boolean enableSsl() { - return this.SERVER_SIDE_KEYSTORE != null; - } - //below are methods called directly by tests //we don't actually implement any of these for now, //just to make test compile pass. @@ -500,23 +489,11 @@ public class BrokerService implements Service { public void setTransportConnectors(List transportConnectors) throws Exception { this.transportConnectors = transportConnectors; for (TransportConnector connector : transportConnectors) { - if (connector.getUri().getScheme().equals("ssl")) { - boolean added = this.extraConnectors.add(new ConnectorInfo(connector.getUri().getPort(), true)); - if (added) { - System.out.println("added ssl connector " + connector); - } - else { - System.out.println("WARNing! failed to add ssl connector: " + connector); - } + if (sslContext instanceof SpringSslContext) { + this.extraConnectors.add(new ConnectorInfo(connector.getUri(), (SpringSslContext)sslContext)); } else { - boolean added = this.extraConnectors.add(new ConnectorInfo(connector.getUri().getPort())); - if (added) { - System.out.println("added connector " + connector); - } - else { - System.out.println("WARNing! failed to add connector: " + connector); - } + this.extraConnectors.add(new ConnectorInfo(connector.getUri())); } } } @@ -584,7 +561,12 @@ public class BrokerService implements Service { connector = new FakeTransportConnector(bindAddress); this.transportConnectors.add(connector); - this.extraConnectors.add(new ConnectorInfo(bindAddress)); + if (sslContext instanceof SpringSslContext) { + this.extraConnectors.add(new ConnectorInfo(bindAddress, (SpringSslContext) sslContext)); + } + else { + this.extraConnectors.add(new ConnectorInfo(bindAddress)); + } return connector; } @@ -738,14 +720,6 @@ public class BrokerService implements Service { public void setSslContext(SslContext sslContext) { this.sslContext = sslContext; - if (sslContext instanceof SpringSslContext) { - SpringSslContext springContext = (SpringSslContext)sslContext; - this.SERVER_SIDE_KEYSTORE = springContext.getKeyStore(); - this.KEYSTORE_PASSWORD = springContext.getKeyStorePassword(); - this.SERVER_SIDE_TRUSTSTORE = springContext.getTrustStore(); - this.TRUSTSTORE_PASSWORD = springContext.getTrustStorePassword(); - this.storeType = springContext.getKeyStoreType(); - } } public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { @@ -807,48 +781,15 @@ public class BrokerService implements Service { public URI uri; public boolean ssl; - public String keyStore; - public String keyStorePassword; - public String keyStoreType; - - public String trustStore; - public String trustStorePassword; - public String trustStoreType; - public boolean clientAuth; - public ConnectorInfo(int port) throws URISyntaxException { - this(port, false); - } - - public ConnectorInfo(int port, boolean ssl) throws URISyntaxException { - this(port, ssl, false); - } - - public ConnectorInfo(int port, boolean ssl, boolean clientAuth) throws URISyntaxException { - this.ssl = ssl; - if (port == 0) { - port = getPseudoRandomPort(); - } - - String baseUri = "tcp://localhost:" + port + "?protocols=OPENWIRE,CORE"; - if (ssl) { - baseUri = baseUri + "&" + TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + defaultKeyStore + "&" - + TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + defaultKeyStorePassword + "&" - + TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + defaultKeyStoreType; - if (clientAuth) { - baseUri = baseUri + "&" + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME + "=true" + "&" - + TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + defaultTrustStore + "&" - + TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + defaultTrustStorePassword + "&" - + TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + defaultTrustStoreType; - } - } - this.uri = new URI(baseUri); + public ConnectorInfo(URI bindAddress) throws URISyntaxException { + this(bindAddress, null); } //bindAddress must be Artemis compliant, except //scheme - public ConnectorInfo(URI bindAddress) throws URISyntaxException { + public ConnectorInfo(URI bindAddress, SpringSslContext context) throws URISyntaxException { Integer port = bindAddress.getPort(); String host = bindAddress.getHost(); @@ -870,16 +811,16 @@ public class BrokerService implements Service { host, port, bindAddress.getPath(), bindAddress.getQuery(), bindAddress.getFragment()); } else { - String baseUri = "tcp://" + host + ":" + port + "?protocols=OPENWIRE,CORE&" + String baseUri = "tcp://" + host + ":" + port + "?" + TransportConstants.SSL_ENABLED_PROP_NAME + "=true&" - + TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + defaultKeyStore + "&" - + TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + defaultKeyStorePassword + "&" - + TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + defaultKeyStoreType; + + TransportConstants.KEYSTORE_PATH_PROP_NAME + "=" + (context == null ? defaultKeyStore : context.getKeyStore()) + "&" + + TransportConstants.KEYSTORE_PASSWORD_PROP_NAME + "=" + (context == null ? defaultKeyStorePassword : context.getKeyStorePassword()) + "&" + + TransportConstants.KEYSTORE_PROVIDER_PROP_NAME + "=" + (context == null ? defaultKeyStoreType : context.getKeyStoreType()); if (clientAuth) { baseUri = baseUri + "&" + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME + "=true" + "&" - + TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + defaultTrustStore + "&" - + TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + defaultTrustStorePassword + "&" - + TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + defaultTrustStoreType; + + TransportConstants.TRUSTSTORE_PATH_PROP_NAME + "=" + (context == null ? defaultTrustStore : context.getTrustStore()) + "&" + + TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME + "=" + (context == null ? defaultTrustStorePassword : context.getTrustStorePassword()) + "&" + + TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME + "=" + (context == null ? defaultTrustStoreType : context.getTrustStoreType()); } uri = new URI(baseUri); } diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/SslBrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/SslBrokerService.java index b91d328b48..2f36cf62d1 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/SslBrokerService.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/SslBrokerService.java @@ -56,11 +56,4 @@ public class SslBrokerService extends BrokerService { SecureRandom random) throws IOException, KeyManagementException { return null; } - - //one way - public void setupSsl(String keystoreType, String password, String serverKeystore) { - this.SERVER_SIDE_KEYSTORE = serverKeystore; - this.KEYSTORE_PASSWORD = password; - this.storeType = keystoreType; - } } diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 61c6d8767b..17d81d76cb 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -90,11 +90,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { commonSettings.setAutoCreateJmsQueues(true); if (bservice.extraConnectors.size() == 0) { - serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616?protocols=OPENWIRE,CORE"); - } - if (this.bservice.enableSsl()) { - //default - addServerAcceptor(serverConfig, new BrokerService.ConnectorInfo(61611, true)); + serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616"); } for (BrokerService.ConnectorInfo info : bservice.extraConnectors) { diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index c0ed126bca..e06001ae00 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -78,6 +78,7 @@ public class TcpTransportFactory extends TransportFactory { params.remove("broker.useJmx"); params.remove("marshal"); params.remove("create"); + params.remove("asyncQueueDepth"); URI location2 = URISupport.createRemainingURI(location, params); return super.doConnect(location2); } diff --git a/tests/activemq5-unit-tests/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory b/tests/activemq5-unit-tests/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory deleted file mode 100644 index c76e40e79c..0000000000 --- a/tests/activemq5-unit-tests/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory +++ /dev/null @@ -1,2 +0,0 @@ -org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory - diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java index e85f6a87c0..df71ec500e 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java @@ -204,7 +204,7 @@ public class ActiveMQSslConnectionFactoryTest extends CombinationTestSupport { SslBrokerService service = new SslBrokerService(); service.setPersistent(false); - service.setupSsl(KEYSTORE_TYPE, PASSWORD, SERVER_KEYSTORE); + service.addConnector(uri); service.start(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java index 9d04bf1022..866ab63ae5 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java @@ -16,15 +16,11 @@ */ package org.apache.activemq; -import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -39,27 +35,27 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; -import junit.framework.TestCase; - -import org.apache.activemq.transport.TransportListener; -import org.apache.activemq.transport.vm.VMTransport; import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @version */ -public class JmsTempDestinationTest extends TestCase { +public class JmsTempDestinationTest { private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class); private Connection connection; private ActiveMQConnectionFactory factory; protected List connections = Collections.synchronizedList(new ArrayList()); - @Override - protected void setUp() throws Exception { - factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + @Before + public void setUp() throws Exception { + factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setAlwaysSyncSend(true); connection = factory.createConnection(); connections.add(connection); @@ -68,8 +64,8 @@ public class JmsTempDestinationTest extends TestCase { /** * @see junit.framework.TestCase#tearDown() */ - @Override - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { for (Iterator iter = connections.iterator(); iter.hasNext(); ) { Connection conn = iter.next(); try { @@ -86,6 +82,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testTempDestOnlyConsumedByLocalConn() throws JMSException { connection.start(); @@ -103,22 +100,22 @@ public class JmsTempDestinationTest extends TestCase { TemporaryQueue otherQueue = otherSession.createTemporaryQueue(); MessageConsumer consumer = otherSession.createConsumer(otherQueue); Message msg = consumer.receive(3000); - assertNull(msg); + Assert.assertNull(msg); // should throw InvalidDestinationException when consuming a temp // destination from another connection try { consumer = otherSession.createConsumer(queue); - fail("Send should fail since temp destination should be used from another connection"); + Assert.fail("Send should fail since temp destination should be used from another connection"); } catch (InvalidDestinationException e) { - assertTrue("failed to throw an exception", true); + Assert.assertTrue("failed to throw an exception", true); } // should be able to consume temp destination from the same connection consumer = tempSession.createConsumer(queue); msg = consumer.receive(3000); - assertNotNull(msg); + Assert.assertNotNull(msg); } @@ -128,6 +125,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testTempQueueHoldsMessagesWithConsumers() throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createTemporaryQueue(); @@ -140,9 +138,9 @@ public class JmsTempDestinationTest extends TestCase { producer.send(message); Message message2 = consumer.receive(1000); - assertNotNull(message2); - assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); - assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText())); + Assert.assertNotNull(message2); + Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); + Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText())); } /** @@ -151,6 +149,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -163,9 +162,9 @@ public class JmsTempDestinationTest extends TestCase { connection.start(); MessageConsumer consumer = session.createConsumer(queue); Message message2 = consumer.receive(3000); - assertNotNull(message2); - assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); - assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText())); + Assert.assertNotNull(message2); + Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); + Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText())); } @@ -174,6 +173,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testTmpQueueWorksUnderLoad() throws JMSException { int count = 500; int dataSize = 1024; @@ -197,9 +197,9 @@ public class JmsTempDestinationTest extends TestCase { MessageConsumer consumer = session.createConsumer(queue); for (int i = 0; i < count; i++) { Message message2 = consumer.receive(2000); - assertTrue(message2 != null); - assertEquals(i, message2.getIntProperty("c")); - assertTrue(message2.equals(list.get(i))); + Assert.assertTrue(message2 != null); + Assert.assertEquals(i, message2.getIntProperty("c")); + Assert.assertTrue(message2.equals(list.get(i))); } } @@ -211,18 +211,20 @@ public class JmsTempDestinationTest extends TestCase { * @throws InterruptedException * @throws URISyntaxException */ + @Test public void testPublishFailsForClosedConnection() throws Exception { Connection tempConnection = factory.createConnection(); connections.add(tempConnection); - Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final TemporaryQueue queue = tempSession.createTemporaryQueue(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); + Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = tempSession.createTemporaryQueue(); + final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection; - assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { + Assert.assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return activeMQConnection.activeTempDestinations.containsKey(queue); @@ -246,9 +248,10 @@ public class JmsTempDestinationTest extends TestCase { try { message = session.createTextMessage("Hello"); producer.send(message); - fail("Send should fail since temp destination should not exist anymore."); + Assert.fail("Send should fail since temp destination should not exist anymore."); } catch (JMSException e) { + e.printStackTrace(); } } @@ -259,18 +262,23 @@ public class JmsTempDestinationTest extends TestCase { * @throws JMSException * @throws InterruptedException */ + @Test public void testPublishFailsForDestroyedTempDestination() throws Exception { Connection tempConnection = factory.createConnection(); connections.add(tempConnection); - Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final TemporaryQueue queue = tempSession.createTemporaryQueue(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); + //In artemis, if you send a message to a topic where the consumer isn't there yet, + //message will get lost. So the create temp queue request has to happen + //after the connection is started (advisory consumer registered). + Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = tempSession.createTemporaryQueue(); + final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection; - assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { + Assert.assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return activeMQConnection.activeTempDestinations.containsKey(queue); @@ -293,10 +301,10 @@ public class JmsTempDestinationTest extends TestCase { try { message = session.createTextMessage("Hello"); producer.send(message); - fail("Send should fail since temp destination should not exist anymore."); + Assert.fail("Send should fail since temp destination should not exist anymore."); } catch (JMSException e) { - assertTrue("failed to throw an exception", true); + Assert.assertTrue("failed to throw an exception", true); } } @@ -305,6 +313,7 @@ public class JmsTempDestinationTest extends TestCase { * * @throws JMSException */ + @Test public void testDeleteDestinationWithSubscribersFails() throws JMSException { Connection connection = factory.createConnection(); connections.add(connection); @@ -319,65 +328,14 @@ public class JmsTempDestinationTest extends TestCase { // now closed. try { queue.delete(); - fail("Should fail as Subscribers are active"); + Assert.fail("Should fail as Subscribers are active"); } catch (JMSException e) { - assertTrue("failed to throw an exception", true); + Assert.assertTrue("failed to throw an exception", true); } } + //removed. the original test is only for vm transport. tcp transport will block anyway. public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception { - ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20"); - Connection connection = advisoryConnFactory.createConnection(); - connections.add(connection); - connection.start(); - - final CountDownLatch done = new CountDownLatch(1); - final AtomicBoolean ok = new AtomicBoolean(true); - final AtomicBoolean first = new AtomicBoolean(true); - VMTransport t = ((ActiveMQConnection) connection).getTransport().narrow(VMTransport.class); - t.setTransportListener(new TransportListener() { - @Override - public void onCommand(Object command) { - // block first dispatch for a while so broker backs up, but other connection should be able to proceed - if (first.compareAndSet(true, false)) { - try { - ok.set(done.await(35, TimeUnit.SECONDS)); - LOG.info("Done waiting: " + ok.get()); - } - catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - @Override - public void onException(IOException error) { - } - - @Override - public void transportInterupted() { - } - - @Override - public void transportResumed() { - } - }); - - connection = factory.createConnection(); - connections.add(connection); - ((ActiveMQConnection) connection).setWatchTopicAdvisories(false); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - for (int i = 0; i < 2500; i++) { - TemporaryQueue queue = session.createTemporaryQueue(); - MessageConsumer consumer = session.createConsumer(queue); - consumer.close(); - queue.delete(); - } - LOG.info("Done with work: " + ok.get()); - done.countDown(); - assertTrue("ok", ok.get()); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java index 8cac437405..0c0ab301d3 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java @@ -31,6 +31,7 @@ import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; @@ -56,6 +57,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); adapter.setConcurrentStoreAndDispatchQueues(false); broker.setPersistenceAdapter(adapter); + broker.addConnector("tcp://localhost:61616"); broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0&transport.soWriteTimeout=1000&transport.sleep=1000"); if ("nio".equals(brokerTransportScheme)) { broker.addConnector("stomp+" + brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize=" + receiveBufferSize + "&trace=true"); @@ -73,7 +75,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { messageTextPrefix = initMessagePrefix(8 * 1024); sendMessages(dest, 500); - URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri()); + URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri()); LOG.info("consuming using uri: " + tcpBrokerUri); SocketProxy proxy = new SocketProxy(); @@ -104,7 +106,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { messageTextPrefix = initMessagePrefix(8 * 1024); sendMessages(dest, 500); - URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri()); + URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(2).getConnectUri()); LOG.info("consuming using uri: " + stompBrokerUri); SocketProxy proxy = new SocketProxy(); @@ -121,11 +123,12 @@ public class SoWriteTimeoutTest extends JmsTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - frame = "SUBSCRIBE\n" + "destination:/queue/" + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; + frame = "SUBSCRIBE\n" + "destination:jms.queue." + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); // ensure dispatch has started before pause frame = stompConnection.receiveFrame(); + System.out.println("frame: " + frame); assertTrue(frame.startsWith("MESSAGE")); proxy.pause(); @@ -148,7 +151,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { // verify connection is dead try { for (int i = 0; i < 200; i++) { - stompConnection.send("/queue/" + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i); + stompConnection.send("jms.queue." + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i); } fail("expected send to fail with timeout out connection"); } @@ -164,6 +167,7 @@ public class SoWriteTimeoutTest extends JmsTestSupport { @Override protected void setUp() throws Exception { + BrokerService.disableWrapper = true; setAutoFail(true); super.setUp(); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 4f8a0075ec..aa41a518c0 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -138,34 +139,36 @@ public class FakePostOffice implements PostOffice { } @Override - public void route(ServerMessage message, - QueueCreator creator, - RoutingContext context, - boolean direct) throws Exception { + public RoutingStatus route(ServerMessage message, + QueueCreator creator, + RoutingContext context, + boolean direct) throws Exception { + return RoutingStatus.OK; } @Override - public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception { + public RoutingStatus route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception { + return RoutingStatus.OK; + } + + @Override + public RoutingStatus route(ServerMessage message, + QueueCreator creator, + RoutingContext context, + boolean direct, + boolean rejectDuplicates) throws Exception { + return RoutingStatus.OK; } @Override - public void route(ServerMessage message, - QueueCreator creator, - RoutingContext context, - boolean direct, - boolean rejectDuplicates) throws Exception { - - } - - @Override - public void route(ServerMessage message, + public RoutingStatus route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception { - + return RoutingStatus.OK; } @Override @@ -173,7 +176,7 @@ public class FakePostOffice implements PostOffice { } @Override - public void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception { - + public RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception { + return RoutingStatus.OK; } } \ No newline at end of file