diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java index f7469154fc..8bf7a9a200 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java @@ -27,6 +27,25 @@ import org.apache.activemq.api.core.SimpleString; */ public interface ClientSession extends XAResource, AutoCloseable { + + /** + * This is used to identify a ClientSession as used by the JMS Layer + * The JMS Layer will add this through Meta-data, so the server or management layers + * can identify session created over core API purely or through the JMS Layer + */ + String JMS_SESSION_IDENTIFIER_PROPERTY = "jms-session"; + + + /** + * Just like {@link org.apache.activemq.api.core.client.ClientSession.AddressQuery#JMS_SESSION_IDENTIFIER_PROPERTY} this is + * used to identify the ClientID over JMS Session. + * However this is only used when the JMS Session.clientID is set (which is optional). + * With this property management tools and the server can identify the jms-client-id used over JMS + */ + String JMS_SESSION_CLIENT_ID_PROPERTY = "jms-client-id"; + + + /** * Information returned by a binding query * diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java index 2017d083f3..49fdd4701a 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java @@ -230,7 +230,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme try { - initialSession.addUniqueMetaData("jms-client-id", clientID); + initialSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID); } catch (ActiveMQException e) { @@ -732,10 +732,10 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme private void addSessionMetaData(ClientSession session) throws ActiveMQException { - session.addMetaData("jms-session", ""); + session.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, ""); if (clientID != null) { - session.addMetaData("jms-client-id", clientID); + session.addMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID); } } diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java index 5f13aafee0..94ff1c7160 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java @@ -409,16 +409,12 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To try { ClientSession.AddressQuery query = clientSession.addressQuery(address); - if (!query.isExists()) + + // if it's autoCreateJMSQueue we will let the PostOffice.route to execute the creation at the server's side + // as that's a more efficient path for such operation + if (!query.isExists() && !query.isAutoCreateJmsQueues()) { - if (query.isAutoCreateJmsQueues()) - { - clientSession.createQueue(address, address, true); - } - else - { - throw new InvalidDestinationException("Destination " + address + " does not exist"); - } + throw new InvalidDestinationException("Destination " + address + " does not exist"); } else { diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java index 2d8b46ec50..8f16d76e31 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.api.core.client.ClientSession; import org.apache.activemq.api.core.management.Parameter; import org.apache.activemq.api.jms.JMSFactoryType; import org.apache.activemq.api.jms.management.ConnectionFactoryControl; @@ -737,9 +738,10 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo Map jmsSessions = new HashMap(); + // First separate the real jms sessions, after all we are only interested in those here on the *jms* server controller for (ServerSession session : sessions) { - if (session.getMetaData("jms-session") != null) + if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) { jmsSessions.put(session.getConnectionID(), session); } @@ -754,7 +756,8 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo obj.put("connectionID", connection.getID().toString()); obj.put("clientAddress", connection.getRemoteAddress()); obj.put("creationTime", connection.getCreationTime()); - obj.put("clientID", session.getMetaData("jms-client-id")); + // Notice: this will be null when the user haven't set the client-id + obj.put("clientID", session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY)); obj.put("principal", session.getUsername()); array.put(obj); } @@ -986,7 +989,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo public String closeConnectionWithClientID(final String clientID) throws Exception { - return server.getActiveMQServer().destroyConnectionWithSessionMetadata("jms-client-id", clientID); + return server.getActiveMQServer().destroyConnectionWithSessionMetadata(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID); } private JSONObject toJSONObject(ServerConsumer consumer) throws Exception diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java index e0cff5f5d8..ad74b12e76 100644 --- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java +++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java @@ -18,7 +18,6 @@ package org.apache.activemq.jms.server.impl; import javax.naming.NamingException; import javax.transaction.xa.Xid; - import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -50,6 +49,7 @@ import org.apache.activemq.core.security.Role; import org.apache.activemq.core.server.ActivateCallback; import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.Queue; +import org.apache.activemq.core.server.QueueCreator; import org.apache.activemq.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.core.server.management.Notification; import org.apache.activemq.core.settings.impl.AddressSettings; @@ -62,9 +62,9 @@ import org.apache.activemq.jms.client.ActiveMQQueue; import org.apache.activemq.jms.client.ActiveMQTopic; import org.apache.activemq.jms.client.SelectorTranslator; import org.apache.activemq.jms.persistence.JMSStorageManager; +import org.apache.activemq.jms.persistence.config.PersistedBindings; import org.apache.activemq.jms.persistence.config.PersistedConnectionFactory; import org.apache.activemq.jms.persistence.config.PersistedDestination; -import org.apache.activemq.jms.persistence.config.PersistedBindings; import org.apache.activemq.jms.persistence.config.PersistedType; import org.apache.activemq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl; import org.apache.activemq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl; @@ -400,6 +400,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return; } + server.setJMSQueueCreator(new JMSQueueCreator()); + server.registerActivateCallback(this); /** * See this method's javadoc. @@ -491,6 +493,16 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback final boolean durable, final String... bindings) throws Exception { + return internalCreateJMSQueue(storeConfig, queueName, selectorString, durable, false, bindings); + } + + protected boolean internalCreateJMSQueue(final boolean storeConfig, + final String queueName, + final String selectorString, + final boolean durable, + final boolean autoCreated, + final String... bindings) throws Exception + { if (active && queues.get(queueName) != null) { @@ -1881,4 +1893,29 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } + + + + class JMSQueueCreator implements QueueCreator + { + private final SimpleString PREFIX = SimpleString.toSimpleString("jms.queue"); + @Override + public boolean create(SimpleString address) throws Exception + { + AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); + if (address.startsWith(PREFIX) && settings.isAutoCreateJmsQueues()) + { + // stopped here... finish here + JMSServerManagerImpl.this.internalCreateJMSQueue(false, address.toString().substring(PREFIX.toString().length() + 1), null, true, true); + return true; + } + else + { + return false; + } + } + } + + + } diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index e049175109..000cc258ea 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -105,7 +105,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se true, //boolean xa, (String) null, this, - null); + null, + true); } @Override diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java index 4130d6e37b..b0963817e6 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.core.protocol.openwire; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSSecurityException; +import javax.jms.ResourceAllocationException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -31,15 +34,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSSecurityException; -import javax.jms.ResourceAllocationException; - import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.api.core.ActiveMQException; import org.apache.activemq.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.api.core.ActiveMQSecurityException; +import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.BrokerInfo; @@ -74,19 +74,6 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; -import org.apache.activemq.core.server.QueueQueryResult; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.state.CommandVisitor; -import org.apache.activemq.state.ConnectionState; -import org.apache.activemq.state.ConsumerState; -import org.apache.activemq.state.ProducerState; -import org.apache.activemq.state.SessionState; -import org.apache.activemq.thread.TaskRunner; -import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.transport.TransmitCallback; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.wireformat.WireFormat; -import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.core.protocol.openwire.amq.AMQBrokerStoppedException; import org.apache.activemq.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.core.protocol.openwire.amq.AMQConsumerBrokerExchange; @@ -101,10 +88,21 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStat import org.apache.activemq.core.remoting.CloseListener; import org.apache.activemq.core.remoting.FailureListener; import org.apache.activemq.core.server.ActiveMQServerLogger; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Connection; +import org.apache.activemq.state.CommandVisitor; +import org.apache.activemq.state.ConnectionState; +import org.apache.activemq.state.ConsumerState; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.state.SessionState; +import org.apache.activemq.thread.TaskRunner; +import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.TransmitCallback; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.utils.ConcurrentHashSet; +import org.apache.activemq.wireformat.WireFormat; /** * Represents an activemq connection. @@ -1403,12 +1401,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor if (producerExchange.canDispatch(messageSend)) { - if (messageSend.getDestination().isQueue()) - { - SimpleString queueName = OpenWireUtil.toCoreAddress(messageSend.getDestination()); - autoCreateQueueIfPossible(queueName, session); - } - SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); if (result.isBlockNextSend()) { @@ -1458,15 +1450,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor return resp; } - public void autoCreateQueueIfPossible(SimpleString queueName, AMQSession session) throws Exception - { - QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName); - if (result.isAutoCreateJmsQueues() && !result.isExists()) - { - session.getCoreServer().createQueue(queueName, queueName, null, false, false, true); - } - } - private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { AMQProducerBrokerExchange result = producerExchanges.get(id); diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java index 2ea2286076..171034c965 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java @@ -44,6 +44,7 @@ import org.apache.activemq.core.server.ActiveMQMessageBundle; import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.Queue; +import org.apache.activemq.core.server.QueueCreator; import org.apache.activemq.core.server.ServerConsumer; import org.apache.activemq.core.server.ServerMessage; import org.apache.activemq.core.server.impl.ActiveMQServerImpl; @@ -72,6 +73,7 @@ public class AMQServerSession extends ServerSessionImpl SecurityStore securityStore, ManagementService managementService, ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress, SimpleString simpleString, SessionCallback callback, + QueueCreator queueCreator, OperationContext context) throws Exception { super(name, username, password, @@ -83,7 +85,8 @@ public class AMQServerSession extends ServerSessionImpl securityStore, managementService, activeMQServerImpl, managementAddress, simpleString, callback, - context, new AMQTransactionFactory()); + context, new AMQTransactionFactory(), + queueCreator); } //create a fake session just for security check @@ -387,7 +390,7 @@ public class AMQServerSession extends ServerSessionImpl try { - postOffice.route(msg, routingContext, direct); + postOffice.route(msg, getQueueCreator(), routingContext, direct); Pair value = targetAddressInfos.get(msg.getAddress()); diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java index 088db65d7a..908eded798 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java @@ -21,6 +21,7 @@ import org.apache.activemq.core.persistence.OperationContext; import org.apache.activemq.core.persistence.StorageManager; import org.apache.activemq.core.postoffice.PostOffice; import org.apache.activemq.core.security.SecurityStore; +import org.apache.activemq.core.server.QueueCreator; import org.apache.activemq.core.server.ServerSessionFactory; import org.apache.activemq.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.core.server.impl.ServerSessionImpl; @@ -41,13 +42,13 @@ public class AMQServerSessionFactory implements ServerSessionFactory PostOffice postOffice, ResourceManager resourceManager, SecurityStore securityStore, ManagementService managementService, ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress, - SimpleString simpleString, SessionCallback callback, + SimpleString simpleString, SessionCallback callback, QueueCreator queueCreator, OperationContext context) throws Exception { return new AMQServerSession(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa, connection, storageManager, postOffice, resourceManager, securityStore, - managementService, activeMQServerImpl, managementAddress, simpleString, callback, + managementService, activeMQServerImpl, managementAddress, simpleString, callback, queueCreator, context); } diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java index 54dc8cbc5a..b692183b4c 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.core.protocol.openwire.amq; +import javax.transaction.xa.Xid; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; @@ -28,8 +29,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import javax.transaction.xa.Xid; - +import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerId; @@ -46,9 +46,6 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.core.server.ActiveMQServerLogger; -import org.apache.activemq.wireformat.WireFormat; -import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.core.paging.impl.PagingStoreImpl; import org.apache.activemq.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.core.protocol.openwire.OpenWireMessageConverter; @@ -56,12 +53,14 @@ import org.apache.activemq.core.protocol.openwire.OpenWireProtocolManager; import org.apache.activemq.core.protocol.openwire.OpenWireUtil; import org.apache.activemq.core.protocol.openwire.SendingResult; import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.ServerConsumer; import org.apache.activemq.core.server.ServerMessage; import org.apache.activemq.core.server.impl.ServerMessageImpl; import org.apache.activemq.core.transaction.impl.XidImpl; import org.apache.activemq.spi.core.protocol.SessionCallback; import org.apache.activemq.spi.core.remoting.ReadyListener; +import org.apache.activemq.wireformat.WireFormat; public class AMQSession implements SessionCallback { @@ -109,7 +108,7 @@ public class AMQSession implements SessionCallback { coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, - null, this, new AMQServerSessionFactory()); + null, this, new AMQServerSessionFactory(), true); long sessionId = sessInfo.getSessionId().getValue(); if (sessionId == -1) @@ -143,7 +142,7 @@ public class AMQSession implements SessionCallback if (d.isQueue()) { SimpleString queueName = OpenWireUtil.toCoreAddress(d); - connection.autoCreateQueueIfPossible(queueName, this); + getCoreServer().getJMSQueueCreator().create(queueName); } AMQConsumer consumer = new AMQConsumer(this, d, info); consumer.init(); diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java index 3780180faf..a2a8b44711 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java @@ -274,7 +274,7 @@ class StompProtocolManager implements ProtocolManager, No false, false, null, - stompSession, null); + stompSession, null, true); stompSession.setServerSession(session); sessions.put(connection.getID(), stompSession); } @@ -299,7 +299,7 @@ class StompProtocolManager implements ProtocolManager, No false, false, null, - stompSession, null); + stompSession, null, true); stompSession.setServerSession(session); transactedSessions.put(txID, stompSession); } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java index 3c832096f4..80811c04ba 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java @@ -561,11 +561,11 @@ public class ActiveMQActivation spec.getTransactionTimeout()); result.addMetaData("resource-adapter", "inbound"); - result.addMetaData("jms-session", ""); + result.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, ""); String clientID = ra.getClientID() == null ? spec.getClientID() : ra.getClientID(); if (clientID != null) { - result.addMetaData("jms-client-id", clientID); + result.addMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID); } ActiveMQRALogger.LOGGER.debug("Using queue connection " + result); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java index 761e77f31d..fdc8044949 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java @@ -24,6 +24,7 @@ import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.core.server.ActiveMQComponent; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.Queue; +import org.apache.activemq.core.server.QueueCreator; import org.apache.activemq.core.server.RoutingContext; import org.apache.activemq.core.server.ServerMessage; import org.apache.activemq.core.transaction.Transaction; @@ -66,15 +67,15 @@ public interface PostOffice extends ActiveMQComponent Map getAllBindings(); - void route(ServerMessage message, boolean direct) throws Exception; + void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception; - void route(ServerMessage message, Transaction tx, boolean direct) throws Exception; + void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception; - void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception; + void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception; - void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception; + void route(ServerMessage message, QueueCreator queueCreator, RoutingContext context, boolean direct) throws Exception; - void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception; + void route(ServerMessage message, QueueCreator queueCreator, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception; MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java index 1c8c811ec5..1294a39097 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java @@ -61,6 +61,7 @@ import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.LargeServerMessage; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.Queue; +import org.apache.activemq.core.server.QueueCreator; import org.apache.activemq.core.server.QueueFactory; import org.apache.activemq.core.server.RouteContextList; import org.apache.activemq.core.server.RoutingContext; @@ -81,6 +82,10 @@ import org.apache.activemq.core.transaction.impl.TransactionImpl; import org.apache.activemq.utils.TypedProperties; import org.apache.activemq.utils.UUIDGenerator; +/** + * This is the class that will make the routing to Queues and decide which consumer will get the messages + * It's the queue component on distributing the messages * * + */ public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory { private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -605,30 +610,32 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return addressManager.getBindings(); } - public void route(final ServerMessage message, final boolean direct) throws Exception + public void route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception { - route(message, (Transaction) null, direct); + route(message, queueCreator, (Transaction) null, direct); } - public void route(final ServerMessage message, final Transaction tx, final boolean direct) throws Exception + public void route(final ServerMessage message, QueueCreator queueCreator, final Transaction tx, final boolean direct) throws Exception { - route(message, new RoutingContextImpl(tx), direct); + route(message, queueCreator, new RoutingContextImpl(tx), direct); } public void route(final ServerMessage message, + final QueueCreator queueCreator, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception { - route(message, new RoutingContextImpl(tx), direct, rejectDuplicates); + route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates); } - public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception + public void route(final ServerMessage message, final QueueCreator queueCreator, final RoutingContext context, final boolean direct) throws Exception { - route(message, context, direct, true); + route(message, queueCreator, context, direct, true); } public void route(final ServerMessage message, + final QueueCreator queueCreator, final RoutingContext context, final boolean direct, boolean rejectDuplicates) throws Exception @@ -661,6 +668,18 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding Bindings bindings = addressManager.getBindingsForRoutingAddress(address); + // first check for the auto-queue creation thing + if (bindings == null && queueCreator != null) + { + // There is no queue with this address, we will check if it needs to be created + if (queueCreator.create(address)) + { + // TODO: this is not working!!!! + // reassign bindings if it was created + bindings = addressManager.getBindingsForRoutingAddress(address); + } + } + if (bindings != null) { bindings.route(message, context); @@ -708,7 +727,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding message.setAddress(dlaAddress); - route(message, context.getTransaction(), false); + route(message, null, context.getTransaction(), false); } } else diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java index fa3330efba..e6ae69f157 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -177,7 +177,7 @@ public class ActiveMQPacketHandler implements ChannelHandler request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, - channel), null); + channel), null, true); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java index 8888588368..fde78d447c 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java @@ -111,7 +111,8 @@ public interface ActiveMQServer extends ActiveMQComponent boolean xa, String defaultAddress, SessionCallback callback, - ServerSessionFactory sessionFactory) throws Exception; + ServerSessionFactory sessionFactory, + boolean autoCreateQueues) throws Exception; SecurityStore getSecurityStore(); @@ -142,6 +143,19 @@ public interface ActiveMQServer extends ActiveMQComponent boolean isActive(); + /** + * This is the queue creator responsible for JMS Queue creations* + * @param queueCreator + */ + void setJMSQueueCreator(QueueCreator queueCreator); + + /** + * @see {@link org.apache.activemq.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator)} * + * * + * @return + */ + QueueCreator getJMSQueueCreator(); + /** * Wait for server initialization. * @param timeout diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java b/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java new file mode 100644 index 0000000000..c1c272d132 --- /dev/null +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.core.server; + +import org.apache.activemq.api.core.SimpleString; + +public interface QueueCreator +{ + /** + * + * You should return true if you even tried to create the queue and the queue was already there. + * As the callers of this method will use that as an indicator that they should re-route the messages. + * * + * @return True if a queue was created. + */ + boolean create(SimpleString address) throws Exception; +} diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java index 363385f074..b599562e77 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java @@ -75,6 +75,8 @@ public interface ServerSession void xaSuspend() throws Exception; + QueueCreator getQueueCreator(); + List xaGetInDoubtXids(); int xaGetTimeout(); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java index 322458c45b..4efe63d178 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java @@ -40,6 +40,6 @@ public interface ServerSessionFactory SecurityStore securityStore, ManagementService managementService, ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress, SimpleString simpleString, SessionCallback callback, - OperationContext context) throws Exception; + QueueCreator queueCreator, OperationContext context) throws Exception; } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java index 973f3e1290..4efb8f721d 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java @@ -99,6 +99,7 @@ import org.apache.activemq.core.server.LargeServerMessage; import org.apache.activemq.core.server.MemoryManager; import org.apache.activemq.core.server.NodeManager; import org.apache.activemq.core.server.Queue; +import org.apache.activemq.core.server.QueueCreator; import org.apache.activemq.core.server.QueueFactory; import org.apache.activemq.core.server.ServerSession; import org.apache.activemq.core.server.ServerSessionFactory; @@ -222,6 +223,11 @@ public class ActiveMQServerImpl implements ActiveMQServer private MemoryManager memoryManager; + /** + * This will be set by the JMS Queue Manager. + */ + private QueueCreator jmsQueueCreator; + private final Map sessions = new ConcurrentHashMap(); /** @@ -593,6 +599,18 @@ public class ActiveMQServerImpl implements ActiveMQServer stop(failoverOnServerShutdown, false, false); } + @Override + public QueueCreator getJMSQueueCreator() + { + return jmsQueueCreator; + } + + @Override + public void setJMSQueueCreator(QueueCreator jmsQueueCreator) + { + this.jmsQueueCreator = jmsQueueCreator; + } + /** * Stops the server * @param criticalIOError whether we have encountered an IO error with the journal etc @@ -1007,6 +1025,7 @@ public class ActiveMQServerImpl implements ActiveMQServer return backupManager; } + @Override public ServerSession createSession(final String name, final String username, final String password, @@ -1018,7 +1037,8 @@ public class ActiveMQServerImpl implements ActiveMQServer final boolean xa, final String defaultAddress, final SessionCallback callback, - final ServerSessionFactory sessionFactory) throws Exception + final ServerSessionFactory sessionFactory, + final boolean autoCreateQueues) throws Exception { if (securityStore != null) @@ -1026,14 +1046,22 @@ public class ActiveMQServerImpl implements ActiveMQServer securityStore.authenticate(username, password); } final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor()); - final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory); + final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, + connection, autoCommitSends, autoCommitAcks, preAcknowledge, + xa, defaultAddress, callback, context, sessionFactory, autoCreateQueues); sessions.put(name, session); return session; } - protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception + protected ServerSessionImpl internalCreateSession(String name, String username, + String password, int minLargeMessageSize, + RemotingConnection connection, boolean autoCommitSends, + boolean autoCommitAcks, boolean preAcknowledge, boolean xa, + String defaultAddress, SessionCallback callback, + OperationContext context, ServerSessionFactory sessionFactory, + boolean autoCreateJMSQueues) throws Exception { if (sessionFactory == null) { @@ -1057,7 +1085,8 @@ public class ActiveMQServerImpl implements ActiveMQServer defaultAddress == null ? null : new SimpleString(defaultAddress), callback, - context); + context, + autoCreateJMSQueues ? jmsQueueCreator : null); } else { @@ -1081,6 +1110,7 @@ public class ActiveMQServerImpl implements ActiveMQServer defaultAddress == null ? null : new SimpleString(defaultAddress), callback, + jmsQueueCreator, context); } } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java index c0068f3928..b080351daf 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java @@ -110,7 +110,7 @@ public class DivertImpl implements Divert copy = message; } - postOffice.route(copy, context.getTransaction(), false); + postOffice.route(copy, null, context.getTransaction(), false); } @Override diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java index b0e4289fcd..c6f94d651d 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java @@ -2449,7 +2449,7 @@ public class QueueImpl implements Queue copyMessage.setAddress(toAddress); - postOffice.route(copyMessage, tx, false, rejectDuplicate); + postOffice.route(copyMessage, null, tx, false, rejectDuplicate); acknowledge(tx, ref); } @@ -2673,7 +2673,7 @@ public class QueueImpl implements Queue copyMessage.setAddress(address); - postOffice.route(copyMessage, tx, false, rejectDuplicate); + postOffice.route(copyMessage, null, tx, false, rejectDuplicate); acknowledge(tx, ref); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java index 4cfc0220eb..6b4939ee46 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java @@ -35,6 +35,7 @@ import org.apache.activemq.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.api.core.Message; import org.apache.activemq.api.core.Pair; import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientSession; import org.apache.activemq.api.core.management.CoreNotificationType; import org.apache.activemq.api.core.management.ManagementHelper; import org.apache.activemq.api.core.management.ResourceNames; @@ -63,6 +64,7 @@ import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.LargeServerMessage; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.Queue; +import org.apache.activemq.core.server.QueueCreator; import org.apache.activemq.core.server.QueueQueryResult; import org.apache.activemq.core.server.RoutingContext; import org.apache.activemq.core.server.ServerConsumer; @@ -154,6 +156,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener private final OperationContext context; + private QueueCreator queueCreator; + // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here protected final Map> targetAddressInfos = new HashMap>(); @@ -169,8 +173,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener private final TransactionFactory transactionFactory; - // Constructors --------------------------------------------------------------------------------- - //create an 'empty' session. Only used by AMQServerSession //in order to check username and password protected ServerSessionImpl(String username, String password) @@ -193,35 +195,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener this.managementAddress = null; this.context = null; this.callback = null; - } - - public ServerSessionImpl(final String name, - final String username, - final String password, - final int minLargeMessageSize, - final boolean autoCommitSends, - final boolean autoCommitAcks, - final boolean preAcknowledge, - final boolean strictUpdateDeliveryCount, - final boolean xa, - final RemotingConnection remotingConnection, - final StorageManager storageManager, - final PostOffice postOffice, - final ResourceManager resourceManager, - final SecurityStore securityStore, - final ManagementService managementService, - final ActiveMQServer server, - final SimpleString managementAddress, - final SimpleString defaultAddress, - final SessionCallback callback, - final OperationContext context) throws Exception - { - this(name, username, password, minLargeMessageSize, - autoCommitSends, autoCommitAcks, preAcknowledge, - strictUpdateDeliveryCount, xa, remotingConnection, - storageManager, postOffice, resourceManager, securityStore, - managementService, server, managementAddress, defaultAddress, - callback, context, null); + this.queueCreator = null; } public ServerSessionImpl(final String name, @@ -244,7 +218,38 @@ public class ServerSessionImpl implements ServerSession, FailureListener final SimpleString defaultAddress, final SessionCallback callback, final OperationContext context, - TransactionFactory transactionFactory) throws Exception + final QueueCreator queueCreator) throws Exception + { + this(name, username, password, minLargeMessageSize, + autoCommitSends, autoCommitAcks, preAcknowledge, + strictUpdateDeliveryCount, xa, remotingConnection, + storageManager, postOffice, resourceManager, securityStore, + managementService, server, managementAddress, defaultAddress, + callback, context, null, queueCreator); + } + + public ServerSessionImpl(final String name, + final String username, + final String password, + final int minLargeMessageSize, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final boolean strictUpdateDeliveryCount, + final boolean xa, + final RemotingConnection remotingConnection, + final StorageManager storageManager, + final PostOffice postOffice, + final ResourceManager resourceManager, + final SecurityStore securityStore, + final ManagementService managementService, + final ActiveMQServer server, + final SimpleString managementAddress, + final SimpleString defaultAddress, + final SessionCallback callback, + final OperationContext context, + TransactionFactory transactionFactory, + final QueueCreator queueCreator) throws Exception { this.username = username; @@ -288,6 +293,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener remotingConnection.addFailureListener(this); this.context = context; + this.queueCreator = queueCreator; + if (transactionFactory == null) { this.transactionFactory = new DefaultTransactionFactory(); @@ -421,6 +428,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener } } + + + public QueueCreator getQueueCreator() + { + return queueCreator; + } + public ServerConsumer createConsumer(final long consumerID, final SimpleString queueName, final SimpleString filterString, @@ -1596,6 +1610,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { data = metaData.get(key); } + + if (key.equals(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY)) + { + // we know it's a JMS Session, we now install JMS Hooks of any kind + installJMSHooks(); + } return data; } @@ -1709,16 +1729,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener connectionFailed(me, failedOver); } - // Public - // ---------------------------------------------------------------------------- - public void clearLargeMessage() { currentLargeMessage = null; } - // Private - // ---------------------------------------------------------------------------- + + + private void installJMSHooks() + { + this.queueCreator = server.getJMSQueueCreator(); + } + private Map> cloneTargetAddresses() { @@ -1846,7 +1868,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener try { - postOffice.route(msg, routingContext, direct); + postOffice.route(msg, queueCreator, routingContext, direct); Pair value = targetAddressInfos.get(msg.getAddress()); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java index bef361ceae..6a5443881f 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java @@ -728,7 +728,7 @@ public class ManagementServiceImpl implements ManagementService new SimpleString(notification.getUID())); } - postOffice.route(notificationMessage, false); + postOffice.route(notificationMessage, null, false); } } } diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java index 4c91e77f08..549f60c967 100644 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java +++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java @@ -175,7 +175,7 @@ public class IncomingVertxEventHandler implements ConnectorService try { - postOffice.route(msg, false); + postOffice.route(msg, null, false); } catch (Exception e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java index 53372d107e..d785ceb275 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java @@ -72,6 +72,38 @@ public class AutoCreateJmsQueueTest extends JMSTestBase connection.close(); } + @Test + public void testAutoCreateOnSendToQueueAnonymousProducer() throws Exception + { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test"); + + MessageProducer producer = session.createProducer(null); + + final int numMessages = 100; + + for (int i = 0; i < numMessages; i++) + { + TextMessage mess = session.createTextMessage("msg" + i); + producer.send(queue, mess); + } + + producer.close(); + + MessageConsumer messageConsumer = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < numMessages; i++) + { + Message m = messageConsumer.receive(5000); + Assert.assertNotNull(m); + } + + connection.close(); + } + @Test public void testAutoCreateOnSendToQueueSecurity() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java index 1b6349a94f..bc96752994 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java @@ -661,7 +661,7 @@ public class HangConsumerTest extends ServiceTestBase } @Override - protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception + protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory, boolean autoCreateQueue) throws Exception { return new ServerSessionImpl(name, username, @@ -683,7 +683,8 @@ public class HangConsumerTest extends ServiceTestBase defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), - context); + context, + null); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java index 0032e62c31..172958f4f7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.tests.integration.openwire; +import javax.jms.ConnectionFactory; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -23,10 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import javax.jms.ConnectionFactory; -import javax.management.MBeanServer; -import javax.management.MBeanServerFactory; - import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.jms.management.JMSServerControl; @@ -75,6 +74,7 @@ public class OpenWireTestBase extends ServiceTestBase Map addressSettings = serverConfig.getAddressesSettings(); String match = "jms.queue.#"; AddressSettings dlaSettings = new AddressSettings(); + dlaSettings.setAutoCreateJmsQueues(false); SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ"); dlaSettings.setDeadLetterAddress(dla); addressSettings.put(match, dlaSettings); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java index 6b88682f01..588291ed8f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.core.settings.impl.AddressSettings; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -293,6 +294,32 @@ public class SimpleOpenWireTest extends BasicOpenWireTest assertTrue(message1.getText().equals(message.getText())); } + @Test + public void testAutoDestinationNoCreationOnConsumer() throws JMSException + { + AddressSettings addressSetting = new AddressSettings(); + addressSetting.setAutoCreateJmsQueues(false); + + String address = "foo"; + server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting); + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + TextMessage message = session.createTextMessage("bar"); + Queue queue = new ActiveMQQueue(address); + + try + { + MessageConsumer consumer = session.createConsumer(queue); + Assert.fail("supposed to throw an exception here"); + } + catch (JMSException e) + { + + } + } + /** * This is the example shipped with the distribution * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java index 7141372da6..b47e20d423 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java @@ -16,26 +16,25 @@ */ package org.apache.activemq.tests.integration.openwire.amq; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.core.config.Configuration; import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.core.settings.impl.AddressSettings; import org.apache.activemq.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.transport.tcp.TcpTransport; import org.junit.After; import org.junit.Before; import org.junit.Test; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java index 70cdb7f04f..401d958433 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -29,6 +29,7 @@ import org.apache.activemq.core.postoffice.PostOffice; import org.apache.activemq.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.Queue; +import org.apache.activemq.core.server.QueueCreator; import org.apache.activemq.core.server.RoutingContext; import org.apache.activemq.core.server.ServerMessage; import org.apache.activemq.core.server.impl.MessageReferenceImpl; @@ -153,45 +154,27 @@ public class FakePostOffice implements PostOffice return new MessageReferenceImpl(); } - public void route(final ServerMessage message, final Transaction tx) throws Exception + public void route(ServerMessage message, QueueCreator creator, RoutingContext context, boolean direct) throws Exception { } - public void route(final ServerMessage message, final RoutingContext context) throws Exception - { - - - } - - public void route(ServerMessage message, boolean direct) throws Exception - { - - - } - - public void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception - { - - - } - - public void route(ServerMessage message, Transaction tx, boolean direct) throws Exception + public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception { } @Override - public void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception + public void route(ServerMessage message, QueueCreator creator, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception { } @Override - public void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception + public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception { @@ -200,8 +183,11 @@ public class FakePostOffice implements PostOffice @Override public void processRoute(ServerMessage message, RoutingContext context, boolean direct) throws Exception { - - } + @Override + public void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception + { + + } } \ No newline at end of file