From 95b6328993b8215262cb49e0bc1e5999d3537e12 Mon Sep 17 00:00:00 2001
From: Clebert Suconic <clebertsuconic@apache.org>
Date: Tue, 21 Apr 2015 18:00:05 -0400
Subject: [PATCH] Queue Auto-create fixes on OpenWire

this is basically addressing a performance issue on OpenWire, setting the auto-create to the PostOffice
after not being able to route

The core protocol stays the same in regard to the auto-create since the exceptions are happening after the queueQuery
---
 .../api/core/client/ClientSession.java        | 19 ++++
 .../jms/client/ActiveMQConnection.java        |  6 +-
 .../jms/client/ActiveMQMessageProducer.java   | 14 +--
 .../management/impl/JMSServerControlImpl.java |  9 +-
 .../jms/server/impl/JMSServerManagerImpl.java | 41 +++++++-
 .../ProtonSessionIntegrationCallback.java     |  3 +-
 .../protocol/openwire/OpenWireConnection.java | 47 +++------
 .../openwire/amq/AMQServerSession.java        |  7 +-
 .../openwire/amq/AMQServerSessionFactory.java |  5 +-
 .../protocol/openwire/amq/AMQSession.java     | 13 ++-
 .../protocol/stomp/StompProtocolManager.java  |  4 +-
 .../ra/inflow/ActiveMQActivation.java         |  4 +-
 .../activemq/core/postoffice/PostOffice.java  | 11 ++-
 .../core/postoffice/impl/PostOfficeImpl.java  | 35 +++++--
 .../core/impl/ActiveMQPacketHandler.java      |  2 +-
 .../activemq/core/server/ActiveMQServer.java  | 16 ++-
 .../activemq/core/server/QueueCreator.java    | 32 ++++++
 .../activemq/core/server/ServerSession.java   |  2 +
 .../core/server/ServerSessionFactory.java     |  2 +-
 .../core/server/impl/ActiveMQServerImpl.java  | 38 ++++++-
 .../activemq/core/server/impl/DivertImpl.java |  2 +-
 .../activemq/core/server/impl/QueueImpl.java  |  4 +-
 .../core/server/impl/ServerSessionImpl.java   | 98 ++++++++++++-------
 .../impl/ManagementServiceImpl.java           |  2 +-
 .../vertx/IncomingVertxEventHandler.java      |  2 +-
 .../client/AutoCreateJmsQueueTest.java        | 32 ++++++
 .../integration/client/HangConsumerTest.java  |  5 +-
 .../openwire/OpenWireTestBase.java            |  8 +-
 .../openwire/SimpleOpenWireTest.java          | 27 +++++
 .../openwire/amq/ProducerFlowControlTest.java | 13 ++-
 .../server/impl/fakes/FakePostOffice.java     | 34 ++-----
 31 files changed, 372 insertions(+), 165 deletions(-)
 create mode 100644 activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java

diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java
index f7469154fc..8bf7a9a200 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java
@@ -27,6 +27,25 @@ import org.apache.activemq.api.core.SimpleString;
  */
 public interface ClientSession extends XAResource, AutoCloseable
 {
+
+   /**
+    * This is used to identify a ClientSession as used by the JMS Layer
+    * The JMS Layer will add this through Meta-data, so the server or management layers
+    * can identify session created over core API purely or through the JMS Layer
+    */
+   String JMS_SESSION_IDENTIFIER_PROPERTY = "jms-session";
+
+
+   /**
+    * Just like {@link org.apache.activemq.api.core.client.ClientSession.AddressQuery#JMS_SESSION_IDENTIFIER_PROPERTY} this is
+    * used to identify the ClientID over JMS Session.
+    * However this is only used when the JMS Session.clientID is set (which is optional).
+    * With this property management tools and the server can identify the jms-client-id used over JMS
+    */
+   String JMS_SESSION_CLIENT_ID_PROPERTY = "jms-client-id";
+
+
+
    /**
     * Information returned by a binding query
     *
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java
index 2017d083f3..49fdd4701a 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java
@@ -230,7 +230,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
 
       try
       {
-         initialSession.addUniqueMetaData("jms-client-id", clientID);
+         initialSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
       }
       catch (ActiveMQException e)
       {
@@ -732,10 +732,10 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
 
    private void addSessionMetaData(ClientSession session) throws ActiveMQException
    {
-      session.addMetaData("jms-session", "");
+      session.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, "");
       if (clientID != null)
       {
-         session.addMetaData("jms-client-id", clientID);
+         session.addMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
       }
    }
 
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
index 5f13aafee0..94ff1c7160 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
@@ -409,16 +409,12 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
             try
             {
                ClientSession.AddressQuery query = clientSession.addressQuery(address);
-               if (!query.isExists())
+
+               // if it's autoCreateJMSQueue we will let the PostOffice.route to execute the creation at the server's side
+               // as that's a more efficient path for such operation
+               if (!query.isExists() && !query.isAutoCreateJmsQueues())
                {
-                  if (query.isAutoCreateJmsQueues())
-                  {
-                     clientSession.createQueue(address, address, true);
-                  }
-                  else
-                  {
-                     throw new InvalidDestinationException("Destination " + address + " does not exist");
-                  }
+                  throw new InvalidDestinationException("Destination " + address + " does not exist");
                }
                else
                {
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java
index 2d8b46ec50..8f16d76e31 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.api.core.client.ClientSession;
 import org.apache.activemq.api.core.management.Parameter;
 import org.apache.activemq.api.jms.JMSFactoryType;
 import org.apache.activemq.api.jms.management.ConnectionFactoryControl;
@@ -737,9 +738,10 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
 
          Map<Object, ServerSession> jmsSessions = new HashMap<Object, ServerSession>();
 
+         // First separate the real jms sessions, after all we are only interested in those here on the *jms* server controller
          for (ServerSession session : sessions)
          {
-            if (session.getMetaData("jms-session") != null)
+            if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null)
             {
                jmsSessions.put(session.getConnectionID(), session);
             }
@@ -754,7 +756,8 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
                obj.put("connectionID", connection.getID().toString());
                obj.put("clientAddress", connection.getRemoteAddress());
                obj.put("creationTime", connection.getCreationTime());
-               obj.put("clientID", session.getMetaData("jms-client-id"));
+               // Notice: this will be null when the user haven't set the client-id
+               obj.put("clientID", session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY));
                obj.put("principal", session.getUsername());
                array.put(obj);
             }
@@ -986,7 +989,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
 
    public String closeConnectionWithClientID(final String clientID) throws Exception
    {
-      return server.getActiveMQServer().destroyConnectionWithSessionMetadata("jms-client-id", clientID);
+      return server.getActiveMQServer().destroyConnectionWithSessionMetadata(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
    }
 
    private JSONObject toJSONObject(ServerConsumer consumer) throws Exception
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java
index e0cff5f5d8..ad74b12e76 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java
@@ -18,7 +18,6 @@ package org.apache.activemq.jms.server.impl;
 
 import javax.naming.NamingException;
 import javax.transaction.xa.Xid;
-
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -50,6 +49,7 @@ import org.apache.activemq.core.security.Role;
 import org.apache.activemq.core.server.ActivateCallback;
 import org.apache.activemq.core.server.ActiveMQServer;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.core.server.management.Notification;
 import org.apache.activemq.core.settings.impl.AddressSettings;
@@ -62,9 +62,9 @@ import org.apache.activemq.jms.client.ActiveMQQueue;
 import org.apache.activemq.jms.client.ActiveMQTopic;
 import org.apache.activemq.jms.client.SelectorTranslator;
 import org.apache.activemq.jms.persistence.JMSStorageManager;
+import org.apache.activemq.jms.persistence.config.PersistedBindings;
 import org.apache.activemq.jms.persistence.config.PersistedConnectionFactory;
 import org.apache.activemq.jms.persistence.config.PersistedDestination;
-import org.apache.activemq.jms.persistence.config.PersistedBindings;
 import org.apache.activemq.jms.persistence.config.PersistedType;
 import org.apache.activemq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
 import org.apache.activemq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
@@ -400,6 +400,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
          return;
       }
 
+      server.setJMSQueueCreator(new JMSQueueCreator());
+
       server.registerActivateCallback(this);
       /**
        * See this method's javadoc.
@@ -491,6 +493,16 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
                                            final boolean durable,
                                            final String... bindings) throws Exception
    {
+      return internalCreateJMSQueue(storeConfig, queueName, selectorString, durable, false, bindings);
+   }
+
+   protected boolean internalCreateJMSQueue(final boolean storeConfig,
+                                         final String queueName,
+                                         final String selectorString,
+                                         final boolean durable,
+                                         final boolean autoCreated,
+                                         final String... bindings) throws Exception
+   {
 
       if (active && queues.get(queueName) != null)
       {
@@ -1881,4 +1893,29 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
       }
    }
 
+
+
+
+   class JMSQueueCreator implements QueueCreator
+   {
+      private final SimpleString PREFIX = SimpleString.toSimpleString("jms.queue");
+      @Override
+      public boolean create(SimpleString address) throws Exception
+      {
+         AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+         if (address.startsWith(PREFIX) && settings.isAutoCreateJmsQueues())
+         {
+            // stopped here... finish here
+            JMSServerManagerImpl.this.internalCreateJMSQueue(false, address.toString().substring(PREFIX.toString().length() + 1), null, true, true);
+            return true;
+         }
+         else
+         {
+            return false;
+         }
+      }
+   }
+
+
+
 }
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index e049175109..000cc258ea 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -105,7 +105,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
                                                         true, //boolean xa,
                                                         (String) null,
                                                         this,
-                                                        null);
+                                                        null,
+                                                        true);
    }
 
    @Override
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
index 4130d6e37b..b0963817e6 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.core.protocol.openwire;
 
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSSecurityException;
+import javax.jms.ResourceAllocationException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,15 +34,12 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSSecurityException;
-import javax.jms.ResourceAllocationException;
-
 import org.apache.activemq.api.core.ActiveMQBuffer;
 import org.apache.activemq.api.core.ActiveMQBuffers;
 import org.apache.activemq.api.core.ActiveMQException;
 import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.api.core.ActiveMQSecurityException;
+import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.BrokerInfo;
@@ -74,19 +74,6 @@ import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.core.server.QueueQueryResult;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.state.CommandVisitor;
-import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.state.ConsumerState;
-import org.apache.activemq.state.ProducerState;
-import org.apache.activemq.state.SessionState;
-import org.apache.activemq.thread.TaskRunner;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.TransmitCallback;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.core.protocol.openwire.amq.AMQBrokerStoppedException;
 import org.apache.activemq.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
@@ -101,10 +88,21 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStat
 import org.apache.activemq.core.remoting.CloseListener;
 import org.apache.activemq.core.remoting.FailureListener;
 import org.apache.activemq.core.server.ActiveMQServerLogger;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.spi.core.remoting.Acceptor;
 import org.apache.activemq.spi.core.remoting.Connection;
+import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.state.ConsumerState;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.state.SessionState;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.TransmitCallback;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.utils.ConcurrentHashSet;
+import org.apache.activemq.wireformat.WireFormat;
 
 /**
  * Represents an activemq connection.
@@ -1403,12 +1401,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
 
          if (producerExchange.canDispatch(messageSend))
          {
-            if (messageSend.getDestination().isQueue())
-            {
-               SimpleString queueName = OpenWireUtil.toCoreAddress(messageSend.getDestination());
-               autoCreateQueueIfPossible(queueName, session);
-            }
-
             SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
             if (result.isBlockNextSend())
             {
@@ -1458,15 +1450,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       return resp;
    }
 
-   public void autoCreateQueueIfPossible(SimpleString queueName, AMQSession session) throws Exception
-   {
-      QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
-      if (result.isAutoCreateJmsQueues() && !result.isExists())
-      {
-         session.getCoreServer().createQueue(queueName, queueName, null, false, false, true);
-      }
-   }
-
    private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException
    {
       AMQProducerBrokerExchange result = producerExchanges.get(id);
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java
index 2ea2286076..171034c965 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java
@@ -44,6 +44,7 @@ import org.apache.activemq.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.core.server.ActiveMQServerLogger;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.ServerConsumer;
 import org.apache.activemq.core.server.ServerMessage;
 import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
@@ -72,6 +73,7 @@ public class AMQServerSession extends ServerSessionImpl
          SecurityStore securityStore, ManagementService managementService,
          ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress,
          SimpleString simpleString, SessionCallback callback,
+         QueueCreator queueCreator,
          OperationContext context) throws Exception
    {
       super(name, username, password,
@@ -83,7 +85,8 @@ public class AMQServerSession extends ServerSessionImpl
          securityStore, managementService,
          activeMQServerImpl, managementAddress,
          simpleString, callback,
-         context, new AMQTransactionFactory());
+         context, new AMQTransactionFactory(),
+         queueCreator);
    }
 
    //create a fake session just for security check
@@ -387,7 +390,7 @@ public class AMQServerSession extends ServerSessionImpl
 
       try
       {
-         postOffice.route(msg, routingContext, direct);
+         postOffice.route(msg, getQueueCreator(), routingContext, direct);
 
          Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
 
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java
index 088db65d7a..908eded798 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java
@@ -21,6 +21,7 @@ import org.apache.activemq.core.persistence.OperationContext;
 import org.apache.activemq.core.persistence.StorageManager;
 import org.apache.activemq.core.postoffice.PostOffice;
 import org.apache.activemq.core.security.SecurityStore;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.ServerSessionFactory;
 import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.core.server.impl.ServerSessionImpl;
@@ -41,13 +42,13 @@ public class AMQServerSessionFactory implements ServerSessionFactory
          PostOffice postOffice, ResourceManager resourceManager,
          SecurityStore securityStore, ManagementService managementService,
          ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress,
-         SimpleString simpleString, SessionCallback callback,
+         SimpleString simpleString, SessionCallback callback, QueueCreator queueCreator,
          OperationContext context) throws Exception
    {
       return new AMQServerSession(name, username, password, minLargeMessageSize, autoCommitSends,
             autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa,
             connection, storageManager, postOffice, resourceManager, securityStore,
-            managementService, activeMQServerImpl, managementAddress, simpleString, callback,
+            managementService, activeMQServerImpl, managementAddress, simpleString, callback, queueCreator,
             context);
    }
 
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
index 54dc8cbc5a..b692183b4c 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.core.protocol.openwire.amq;
 
+import javax.transaction.xa.Xid;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,8 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.transaction.xa.Xid;
-
+import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
@@ -46,9 +46,6 @@ import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.core.server.ActiveMQServerLogger;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.core.paging.impl.PagingStoreImpl;
 import org.apache.activemq.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.core.protocol.openwire.OpenWireMessageConverter;
@@ -56,12 +53,14 @@ import org.apache.activemq.core.protocol.openwire.OpenWireProtocolManager;
 import org.apache.activemq.core.protocol.openwire.OpenWireUtil;
 import org.apache.activemq.core.protocol.openwire.SendingResult;
 import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.ActiveMQServerLogger;
 import org.apache.activemq.core.server.ServerConsumer;
 import org.apache.activemq.core.server.ServerMessage;
 import org.apache.activemq.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.core.transaction.impl.XidImpl;
 import org.apache.activemq.spi.core.protocol.SessionCallback;
 import org.apache.activemq.spi.core.remoting.ReadyListener;
+import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback
 {
@@ -109,7 +108,7 @@ public class AMQSession implements SessionCallback
       {
          coreSession = (AMQServerSession) server.createSession(name, username, password,
                minLargeMessageSize, connection, true, false, false, false,
-               null, this, new AMQServerSessionFactory());
+               null, this, new AMQServerSessionFactory(), true);
 
          long sessionId = sessInfo.getSessionId().getValue();
          if (sessionId == -1)
@@ -143,7 +142,7 @@ public class AMQSession implements SessionCallback
          if (d.isQueue())
          {
             SimpleString queueName = OpenWireUtil.toCoreAddress(d);
-            connection.autoCreateQueueIfPossible(queueName, this);
+            getCoreServer().getJMSQueueCreator().create(queueName);
          }
          AMQConsumer consumer = new AMQConsumer(this, d, info);
          consumer.init();
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
index 3780180faf..a2a8b44711 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
@@ -274,7 +274,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
                                                       false,
                                                       false,
                                                       null,
-                                                      stompSession, null);
+                                                      stompSession, null, true);
          stompSession.setServerSession(session);
          sessions.put(connection.getID(), stompSession);
       }
@@ -299,7 +299,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
                                                       false,
                                                       false,
                                                       null,
-                                                      stompSession, null);
+                                                      stompSession, null, true);
          stompSession.setServerSession(session);
          transactedSessions.put(txID, stompSession);
       }
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
index 3c832096f4..80811c04ba 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
@@ -561,11 +561,11 @@ public class ActiveMQActivation
                                    spec.getTransactionTimeout());
 
          result.addMetaData("resource-adapter", "inbound");
-         result.addMetaData("jms-session", "");
+         result.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, "");
          String clientID = ra.getClientID() == null ? spec.getClientID() : ra.getClientID();
          if (clientID != null)
          {
-            result.addMetaData("jms-client-id", clientID);
+            result.addMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
          }
 
          ActiveMQRALogger.LOGGER.debug("Using queue connection " + result);
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java
index 761e77f31d..fdc8044949 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java
@@ -24,6 +24,7 @@ import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.core.server.ActiveMQComponent;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.RoutingContext;
 import org.apache.activemq.core.server.ServerMessage;
 import org.apache.activemq.core.transaction.Transaction;
@@ -66,15 +67,15 @@ public interface PostOffice extends ActiveMQComponent
 
    Map<SimpleString, Binding> getAllBindings();
 
-   void route(ServerMessage message, boolean direct) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception;
 
-   void route(ServerMessage message, Transaction tx, boolean direct) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception;
 
-   void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception;
 
-   void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, RoutingContext context, boolean direct) throws Exception;
 
-   void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception;
 
    MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
 
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java
index 1c8c811ec5..1294a39097 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java
@@ -61,6 +61,7 @@ import org.apache.activemq.core.server.ActiveMQServerLogger;
 import org.apache.activemq.core.server.LargeServerMessage;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.QueueFactory;
 import org.apache.activemq.core.server.RouteContextList;
 import org.apache.activemq.core.server.RoutingContext;
@@ -81,6 +82,10 @@ import org.apache.activemq.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.utils.TypedProperties;
 import org.apache.activemq.utils.UUIDGenerator;
 
+/**
+ * This is the class that will make the routing to Queues and decide which consumer will get the messages
+ * It's the queue component on distributing the messages * *
+ */
 public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory
 {
    private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -605,30 +610,32 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       return addressManager.getBindings();
    }
 
-   public void route(final ServerMessage message, final boolean direct) throws Exception
+   public void route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception
    {
-      route(message, (Transaction) null, direct);
+      route(message, queueCreator, (Transaction) null, direct);
    }
 
-   public void route(final ServerMessage message, final Transaction tx, final boolean direct) throws Exception
+   public void route(final ServerMessage message, QueueCreator queueCreator, final Transaction tx, final boolean direct) throws Exception
    {
-      route(message, new RoutingContextImpl(tx), direct);
+      route(message, queueCreator, new RoutingContextImpl(tx), direct);
    }
 
    public void route(final ServerMessage message,
+                     final QueueCreator queueCreator,
                      final Transaction tx,
                      final boolean direct,
                      final boolean rejectDuplicates) throws Exception
    {
-      route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
+      route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates);
    }
 
-   public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
+   public void route(final ServerMessage message, final QueueCreator queueCreator, final RoutingContext context, final boolean direct) throws Exception
    {
-      route(message, context, direct, true);
+      route(message, queueCreator, context, direct, true);
    }
 
    public void route(final ServerMessage message,
+                     final QueueCreator queueCreator,
                      final RoutingContext context,
                      final boolean direct,
                      boolean rejectDuplicates) throws Exception
@@ -661,6 +668,18 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
 
+      // first check for the auto-queue creation thing
+      if (bindings == null && queueCreator != null)
+      {
+         // There is no queue with this address, we will check if it needs to be created
+         if (queueCreator.create(address))
+         {
+            // TODO: this is not working!!!!
+            // reassign bindings if it was created
+            bindings = addressManager.getBindingsForRoutingAddress(address);
+         }
+      }
+
       if (bindings != null)
       {
          bindings.route(message, context);
@@ -708,7 +727,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
                message.setAddress(dlaAddress);
 
-               route(message, context.getTransaction(), false);
+               route(message, null, context.getTransaction(), false);
             }
          }
          else
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java
index fa3330efba..e6ae69f157 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -177,7 +177,7 @@ public class ActiveMQPacketHandler implements ChannelHandler
                                                       request.getDefaultAddress(),
                                                       new CoreSessionCallback(request.getName(),
                                                                               protocolManager,
-                                                                              channel), null);
+                                                                              channel), null, true);
 
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
                                                                              server.getStorageManager(),
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
index 8888588368..fde78d447c 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
@@ -111,7 +111,8 @@ public interface ActiveMQServer extends ActiveMQComponent
                                boolean xa,
                                String defaultAddress,
                                SessionCallback callback,
-                               ServerSessionFactory sessionFactory) throws Exception;
+                               ServerSessionFactory sessionFactory,
+                               boolean autoCreateQueues) throws Exception;
 
    SecurityStore getSecurityStore();
 
@@ -142,6 +143,19 @@ public interface ActiveMQServer extends ActiveMQComponent
 
    boolean isActive();
 
+   /**
+    * This is the queue creator responsible for JMS Queue creations*
+    * @param queueCreator
+    */
+   void setJMSQueueCreator(QueueCreator queueCreator);
+
+   /**
+    * @see {@link org.apache.activemq.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator)} *
+    * *
+    * @return
+    */
+   QueueCreator getJMSQueueCreator();
+
    /**
     * Wait for server initialization.
     * @param timeout
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java b/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java
new file mode 100644
index 0000000000..c1c272d132
--- /dev/null
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.core.server;
+
+import org.apache.activemq.api.core.SimpleString;
+
+public interface QueueCreator
+{
+   /**
+    *
+    * You should return true if you even tried to create the queue and the queue was already there.
+    * As the callers of this method will use that as an indicator that they should re-route the messages.
+    * *
+    * @return True if a queue was created.
+    */
+   boolean create(SimpleString address) throws Exception;
+}
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java
index 363385f074..b599562e77 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java
@@ -75,6 +75,8 @@ public interface ServerSession
 
    void xaSuspend() throws Exception;
 
+   QueueCreator getQueueCreator();
+
    List<Xid> xaGetInDoubtXids();
 
    int xaGetTimeout();
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java
index 322458c45b..4efe63d178 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java
@@ -40,6 +40,6 @@ public interface ServerSessionFactory
          SecurityStore securityStore, ManagementService managementService,
          ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress,
          SimpleString simpleString, SessionCallback callback,
-         OperationContext context) throws Exception;
+         QueueCreator queueCreator, OperationContext context) throws Exception;
 
 }
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
index 973f3e1290..4efb8f721d 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
@@ -99,6 +99,7 @@ import org.apache.activemq.core.server.LargeServerMessage;
 import org.apache.activemq.core.server.MemoryManager;
 import org.apache.activemq.core.server.NodeManager;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.QueueFactory;
 import org.apache.activemq.core.server.ServerSession;
 import org.apache.activemq.core.server.ServerSessionFactory;
@@ -222,6 +223,11 @@ public class ActiveMQServerImpl implements ActiveMQServer
 
    private MemoryManager memoryManager;
 
+   /**
+    * This will be set by the JMS Queue Manager.
+    */
+   private QueueCreator jmsQueueCreator;
+
    private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
 
    /**
@@ -593,6 +599,18 @@ public class ActiveMQServerImpl implements ActiveMQServer
       stop(failoverOnServerShutdown, false, false);
    }
 
+   @Override
+   public QueueCreator getJMSQueueCreator()
+   {
+      return jmsQueueCreator;
+   }
+
+   @Override
+   public void setJMSQueueCreator(QueueCreator jmsQueueCreator)
+   {
+      this.jmsQueueCreator = jmsQueueCreator;
+   }
+
    /**
     * Stops the server
     * @param criticalIOError          whether we have encountered an IO error with the journal etc
@@ -1007,6 +1025,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
       return backupManager;
    }
 
+   @Override
    public ServerSession createSession(final String name,
                                       final String username,
                                       final String password,
@@ -1018,7 +1037,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
                                       final boolean xa,
                                       final String defaultAddress,
                                       final SessionCallback callback,
-                                      final ServerSessionFactory sessionFactory) throws Exception
+                                      final ServerSessionFactory sessionFactory,
+                                      final boolean autoCreateQueues) throws Exception
    {
 
       if (securityStore != null)
@@ -1026,14 +1046,22 @@ public class ActiveMQServerImpl implements ActiveMQServer
          securityStore.authenticate(username, password);
       }
       final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
-      final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory);
+      final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize,
+                                                              connection, autoCommitSends, autoCommitAcks, preAcknowledge,
+                                                              xa, defaultAddress, callback, context, sessionFactory, autoCreateQueues);
 
       sessions.put(name, session);
 
       return session;
    }
 
-   protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception
+   protected ServerSessionImpl internalCreateSession(String name, String username,
+                                                     String password, int minLargeMessageSize,
+                                                     RemotingConnection connection, boolean autoCommitSends,
+                                                     boolean autoCommitAcks, boolean preAcknowledge, boolean xa,
+                                                     String defaultAddress, SessionCallback callback,
+                                                     OperationContext context, ServerSessionFactory sessionFactory,
+                                                     boolean autoCreateJMSQueues) throws Exception
    {
       if (sessionFactory == null)
       {
@@ -1057,7 +1085,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
                                    defaultAddress == null ? null
                                       : new SimpleString(defaultAddress),
                                    callback,
-                                   context);
+                                   context,
+                                   autoCreateJMSQueues ? jmsQueueCreator : null);
       }
       else
       {
@@ -1081,6 +1110,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
                                    defaultAddress == null ? null
                                       : new SimpleString(defaultAddress),
                                    callback,
+                                   jmsQueueCreator,
                                    context);
       }
    }
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java
index c0068f3928..b080351daf 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java
@@ -110,7 +110,7 @@ public class DivertImpl implements Divert
          copy = message;
       }
 
-      postOffice.route(copy, context.getTransaction(), false);
+      postOffice.route(copy, null, context.getTransaction(), false);
    }
 
    @Override
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
index b0e4289fcd..c6f94d651d 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
@@ -2449,7 +2449,7 @@ public class QueueImpl implements Queue
 
       copyMessage.setAddress(toAddress);
 
-      postOffice.route(copyMessage, tx, false, rejectDuplicate);
+      postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
 
       acknowledge(tx, ref);
    }
@@ -2673,7 +2673,7 @@ public class QueueImpl implements Queue
 
       copyMessage.setAddress(address);
 
-      postOffice.route(copyMessage, tx, false, rejectDuplicate);
+      postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
 
       acknowledge(tx, ref);
 
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java
index 4cfc0220eb..6b4939ee46 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java
@@ -35,6 +35,7 @@ import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.api.core.Message;
 import org.apache.activemq.api.core.Pair;
 import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientSession;
 import org.apache.activemq.api.core.management.CoreNotificationType;
 import org.apache.activemq.api.core.management.ManagementHelper;
 import org.apache.activemq.api.core.management.ResourceNames;
@@ -63,6 +64,7 @@ import org.apache.activemq.core.server.ActiveMQServer;
 import org.apache.activemq.core.server.LargeServerMessage;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.QueueQueryResult;
 import org.apache.activemq.core.server.RoutingContext;
 import org.apache.activemq.core.server.ServerConsumer;
@@ -154,6 +156,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
    private final OperationContext context;
 
+   private QueueCreator queueCreator;
+
    // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
    protected final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
 
@@ -169,8 +173,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
    private final TransactionFactory transactionFactory;
 
-   // Constructors ---------------------------------------------------------------------------------
-
    //create an 'empty' session. Only used by AMQServerSession
    //in order to check username and password
    protected ServerSessionImpl(String username, String password)
@@ -193,35 +195,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       this.managementAddress = null;
       this.context = null;
       this.callback = null;
-   }
-
-   public ServerSessionImpl(final String name,
-                            final String username,
-                            final String password,
-                            final int minLargeMessageSize,
-                            final boolean autoCommitSends,
-                            final boolean autoCommitAcks,
-                            final boolean preAcknowledge,
-                            final boolean strictUpdateDeliveryCount,
-                            final boolean xa,
-                            final RemotingConnection remotingConnection,
-                            final StorageManager storageManager,
-                            final PostOffice postOffice,
-                            final ResourceManager resourceManager,
-                            final SecurityStore securityStore,
-                            final ManagementService managementService,
-                            final ActiveMQServer server,
-                            final SimpleString managementAddress,
-                            final SimpleString defaultAddress,
-                            final SessionCallback callback,
-                            final OperationContext context) throws Exception
-   {
-      this(name, username, password, minLargeMessageSize,
-         autoCommitSends, autoCommitAcks, preAcknowledge,
-         strictUpdateDeliveryCount, xa, remotingConnection,
-         storageManager, postOffice, resourceManager, securityStore,
-         managementService, server, managementAddress, defaultAddress,
-         callback, context, null);
+      this.queueCreator = null;
    }
 
    public ServerSessionImpl(final String name,
@@ -244,7 +218,38 @@ public class ServerSessionImpl implements ServerSession, FailureListener
                             final SimpleString defaultAddress,
                             final SessionCallback callback,
                             final OperationContext context,
-                            TransactionFactory transactionFactory) throws Exception
+                            final QueueCreator queueCreator) throws Exception
+   {
+      this(name, username, password, minLargeMessageSize,
+         autoCommitSends, autoCommitAcks, preAcknowledge,
+         strictUpdateDeliveryCount, xa, remotingConnection,
+         storageManager, postOffice, resourceManager, securityStore,
+         managementService, server, managementAddress, defaultAddress,
+         callback, context, null, queueCreator);
+   }
+
+   public ServerSessionImpl(final String name,
+                            final String username,
+                            final String password,
+                            final int minLargeMessageSize,
+                            final boolean autoCommitSends,
+                            final boolean autoCommitAcks,
+                            final boolean preAcknowledge,
+                            final boolean strictUpdateDeliveryCount,
+                            final boolean xa,
+                            final RemotingConnection remotingConnection,
+                            final StorageManager storageManager,
+                            final PostOffice postOffice,
+                            final ResourceManager resourceManager,
+                            final SecurityStore securityStore,
+                            final ManagementService managementService,
+                            final ActiveMQServer server,
+                            final SimpleString managementAddress,
+                            final SimpleString defaultAddress,
+                            final SessionCallback callback,
+                            final OperationContext context,
+                            TransactionFactory transactionFactory,
+                            final QueueCreator queueCreator) throws Exception
    {
       this.username = username;
 
@@ -288,6 +293,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       remotingConnection.addFailureListener(this);
       this.context = context;
 
+      this.queueCreator = queueCreator;
+
       if (transactionFactory == null)
       {
          this.transactionFactory = new DefaultTransactionFactory();
@@ -421,6 +428,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       }
    }
 
+
+
+   public QueueCreator getQueueCreator()
+   {
+      return queueCreator;
+   }
+
    public ServerConsumer createConsumer(final long consumerID,
                                         final SimpleString queueName,
                                         final SimpleString filterString,
@@ -1596,6 +1610,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       {
          data = metaData.get(key);
       }
+
+      if (key.equals(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY))
+      {
+         // we know it's a JMS Session, we now install JMS Hooks of any kind
+         installJMSHooks();
+      }
       return data;
    }
 
@@ -1709,16 +1729,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       connectionFailed(me, failedOver);
    }
 
-   // Public
-   // ----------------------------------------------------------------------------
-
    public void clearLargeMessage()
    {
       currentLargeMessage = null;
    }
 
-   // Private
-   // ----------------------------------------------------------------------------
+
+
+   private void installJMSHooks()
+   {
+      this.queueCreator = server.getJMSQueueCreator();
+   }
+
 
    private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses()
    {
@@ -1846,7 +1868,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
       try
       {
-         postOffice.route(msg, routingContext, direct);
+         postOffice.route(msg, queueCreator, routingContext, direct);
 
          Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
 
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java
index bef361ceae..6a5443881f 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java
@@ -728,7 +728,7 @@ public class ManagementServiceImpl implements ManagementService
                                                         new SimpleString(notification.getUID()));
                }
 
-               postOffice.route(notificationMessage, false);
+               postOffice.route(notificationMessage, null, false);
             }
          }
       }
diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java
index 4c91e77f08..549f60c967 100644
--- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java
+++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java
@@ -175,7 +175,7 @@ public class IncomingVertxEventHandler implements ConnectorService
 
          try
          {
-            postOffice.route(msg, false);
+            postOffice.route(msg, null, false);
          }
          catch (Exception e)
          {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java
index 53372d107e..d785ceb275 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java
@@ -72,6 +72,38 @@ public class AutoCreateJmsQueueTest extends JMSTestBase
       connection.close();
    }
 
+   @Test
+   public void testAutoCreateOnSendToQueueAnonymousProducer() throws Exception
+   {
+      Connection connection = cf.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
+
+      MessageProducer producer = session.createProducer(null);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         TextMessage mess = session.createTextMessage("msg" + i);
+         producer.send(queue, mess);
+      }
+
+      producer.close();
+
+      MessageConsumer messageConsumer = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         Message m = messageConsumer.receive(5000);
+         Assert.assertNotNull(m);
+      }
+
+      connection.close();
+   }
+
    @Test
    public void testAutoCreateOnSendToQueueSecurity() throws Exception
    {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
index 1b6349a94f..bc96752994 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
@@ -661,7 +661,7 @@ public class HangConsumerTest extends ServiceTestBase
       }
 
       @Override
-      protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception
+      protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory, boolean autoCreateQueue) throws Exception
       {
          return new ServerSessionImpl(name,
             username,
@@ -683,7 +683,8 @@ public class HangConsumerTest extends ServiceTestBase
             defaultAddress == null ? null
                : new SimpleString(defaultAddress),
             new MyCallback(callback),
-            context);
+            context,
+            null);
       }
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java
index 0032e62c31..172958f4f7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.tests.integration.openwire;
 
+import javax.jms.ConnectionFactory;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -23,10 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.ConnectionFactory;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
-
 import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.api.core.TransportConfiguration;
 import org.apache.activemq.api.jms.management.JMSServerControl;
@@ -75,6 +74,7 @@ public class OpenWireTestBase extends ServiceTestBase
       Map<String, AddressSettings> addressSettings = serverConfig.getAddressesSettings();
       String match = "jms.queue.#";
       AddressSettings dlaSettings = new AddressSettings();
+      dlaSettings.setAutoCreateJmsQueues(false);
       SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
       dlaSettings.setDeadLetterAddress(dla);
       addressSettings.put(match, dlaSettings);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
index 6b88682f01..588291ed8f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
@@ -32,6 +32,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.core.settings.impl.AddressSettings;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -293,6 +294,32 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
       assertTrue(message1.getText().equals(message.getText()));
    }
 
+   @Test
+   public void testAutoDestinationNoCreationOnConsumer() throws JMSException
+   {
+      AddressSettings addressSetting = new AddressSettings();
+      addressSetting.setAutoCreateJmsQueues(false);
+
+      String address = "foo";
+      server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting);
+
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      TextMessage message = session.createTextMessage("bar");
+      Queue queue = new ActiveMQQueue(address);
+
+      try
+      {
+         MessageConsumer consumer = session.createConsumer(queue);
+         Assert.fail("supposed to throw an exception here");
+      }
+      catch (JMSException e)
+      {
+
+      }
+   }
+
    /**
     * This is the example shipped with the distribution
     *
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java
index 7141372da6..b47e20d423 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java
@@ -16,26 +16,25 @@
  */
 package org.apache.activemq.tests.integration.openwire.amq;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.core.config.Configuration;
 import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.core.settings.impl.AddressSettings;
 import org.apache.activemq.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.transport.tcp.TcpTransport;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 70cdb7f04f..401d958433 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -29,6 +29,7 @@ import org.apache.activemq.core.postoffice.PostOffice;
 import org.apache.activemq.core.postoffice.impl.DuplicateIDCacheImpl;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.RoutingContext;
 import org.apache.activemq.core.server.ServerMessage;
 import org.apache.activemq.core.server.impl.MessageReferenceImpl;
@@ -153,45 +154,27 @@ public class FakePostOffice implements PostOffice
       return new MessageReferenceImpl();
    }
 
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
+   public void route(ServerMessage message, QueueCreator creator, RoutingContext context, boolean direct) throws Exception
    {
 
 
    }
 
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception
-   {
-
-
-   }
-
-   public void route(ServerMessage message, boolean direct) throws Exception
-   {
-
-
-   }
-
-   public void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception
-   {
-
-
-   }
-
-   public void route(ServerMessage message, Transaction tx, boolean direct) throws Exception
+   public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception
    {
 
 
    }
 
    @Override
-   public void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception
+   public void route(ServerMessage message, QueueCreator creator, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception
    {
 
 
    }
 
    @Override
-   public void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception
+   public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception
    {
 
 
@@ -200,8 +183,11 @@ public class FakePostOffice implements PostOffice
    @Override
    public void processRoute(ServerMessage message, RoutingContext context, boolean direct) throws Exception
    {
-
-
    }
 
+   @Override
+   public void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception
+   {
+
+   }
 }
\ No newline at end of file