Queue Auto-create fixes on OpenWire

this is basically addressing a performance issue on OpenWire, setting the auto-create to the PostOffice
after not being able to route

The core protocol stays the same in regard to the auto-create since the exceptions are happening after the queueQuery
This commit is contained in:
Clebert Suconic 2015-04-21 18:00:05 -04:00
parent aa638197ce
commit 95b6328993
31 changed files with 372 additions and 165 deletions

View File

@ -27,6 +27,25 @@ import org.apache.activemq.api.core.SimpleString;
*/
public interface ClientSession extends XAResource, AutoCloseable
{
/**
* This is used to identify a ClientSession as used by the JMS Layer
* The JMS Layer will add this through Meta-data, so the server or management layers
* can identify session created over core API purely or through the JMS Layer
*/
String JMS_SESSION_IDENTIFIER_PROPERTY = "jms-session";
/**
* Just like {@link org.apache.activemq.api.core.client.ClientSession.AddressQuery#JMS_SESSION_IDENTIFIER_PROPERTY} this is
* used to identify the ClientID over JMS Session.
* However this is only used when the JMS Session.clientID is set (which is optional).
* With this property management tools and the server can identify the jms-client-id used over JMS
*/
String JMS_SESSION_CLIENT_ID_PROPERTY = "jms-client-id";
/**
* Information returned by a binding query
*

View File

@ -230,7 +230,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
try
{
initialSession.addUniqueMetaData("jms-client-id", clientID);
initialSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
}
catch (ActiveMQException e)
{
@ -732,10 +732,10 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
private void addSessionMetaData(ClientSession session) throws ActiveMQException
{
session.addMetaData("jms-session", "");
session.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, "");
if (clientID != null)
{
session.addMetaData("jms-client-id", clientID);
session.addMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
}
}

View File

@ -409,17 +409,13 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
try
{
ClientSession.AddressQuery query = clientSession.addressQuery(address);
if (!query.isExists())
{
if (query.isAutoCreateJmsQueues())
{
clientSession.createQueue(address, address, true);
}
else
// if it's autoCreateJMSQueue we will let the PostOffice.route to execute the creation at the server's side
// as that's a more efficient path for such operation
if (!query.isExists() && !query.isAutoCreateJmsQueues())
{
throw new InvalidDestinationException("Destination " + address + " does not exist");
}
}
else
{
connection.addKnownDestination(address);

View File

@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.management.Parameter;
import org.apache.activemq.api.jms.JMSFactoryType;
import org.apache.activemq.api.jms.management.ConnectionFactoryControl;
@ -737,9 +738,10 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
Map<Object, ServerSession> jmsSessions = new HashMap<Object, ServerSession>();
// First separate the real jms sessions, after all we are only interested in those here on the *jms* server controller
for (ServerSession session : sessions)
{
if (session.getMetaData("jms-session") != null)
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null)
{
jmsSessions.put(session.getConnectionID(), session);
}
@ -754,7 +756,8 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
obj.put("connectionID", connection.getID().toString());
obj.put("clientAddress", connection.getRemoteAddress());
obj.put("creationTime", connection.getCreationTime());
obj.put("clientID", session.getMetaData("jms-client-id"));
// Notice: this will be null when the user haven't set the client-id
obj.put("clientID", session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY));
obj.put("principal", session.getUsername());
array.put(obj);
}
@ -986,7 +989,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
public String closeConnectionWithClientID(final String clientID) throws Exception
{
return server.getActiveMQServer().destroyConnectionWithSessionMetadata("jms-client-id", clientID);
return server.getActiveMQServer().destroyConnectionWithSessionMetadata(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
}
private JSONObject toJSONObject(ServerConsumer consumer) throws Exception

View File

@ -18,7 +18,6 @@ package org.apache.activemq.jms.server.impl;
import javax.naming.NamingException;
import javax.transaction.xa.Xid;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@ -50,6 +49,7 @@ import org.apache.activemq.core.security.Role;
import org.apache.activemq.core.server.ActivateCallback;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.QueueCreator;
import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.core.server.management.Notification;
import org.apache.activemq.core.settings.impl.AddressSettings;
@ -62,9 +62,9 @@ import org.apache.activemq.jms.client.ActiveMQQueue;
import org.apache.activemq.jms.client.ActiveMQTopic;
import org.apache.activemq.jms.client.SelectorTranslator;
import org.apache.activemq.jms.persistence.JMSStorageManager;
import org.apache.activemq.jms.persistence.config.PersistedBindings;
import org.apache.activemq.jms.persistence.config.PersistedConnectionFactory;
import org.apache.activemq.jms.persistence.config.PersistedDestination;
import org.apache.activemq.jms.persistence.config.PersistedBindings;
import org.apache.activemq.jms.persistence.config.PersistedType;
import org.apache.activemq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
import org.apache.activemq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
@ -400,6 +400,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
return;
}
server.setJMSQueueCreator(new JMSQueueCreator());
server.registerActivateCallback(this);
/**
* See this method's javadoc.
@ -491,6 +493,16 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
final boolean durable,
final String... bindings) throws Exception
{
return internalCreateJMSQueue(storeConfig, queueName, selectorString, durable, false, bindings);
}
protected boolean internalCreateJMSQueue(final boolean storeConfig,
final String queueName,
final String selectorString,
final boolean durable,
final boolean autoCreated,
final String... bindings) throws Exception
{
if (active && queues.get(queueName) != null)
{
@ -1881,4 +1893,29 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
}
}
class JMSQueueCreator implements QueueCreator
{
private final SimpleString PREFIX = SimpleString.toSimpleString("jms.queue");
@Override
public boolean create(SimpleString address) throws Exception
{
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
if (address.startsWith(PREFIX) && settings.isAutoCreateJmsQueues())
{
// stopped here... finish here
JMSServerManagerImpl.this.internalCreateJMSQueue(false, address.toString().substring(PREFIX.toString().length() + 1), null, true, true);
return true;
}
else
{
return false;
}
}
}
}

View File

@ -105,7 +105,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
true, //boolean xa,
(String) null,
this,
null);
null,
true);
}
@Override

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.core.protocol.openwire;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
import javax.jms.ResourceAllocationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -31,15 +34,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQBuffers;
import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.api.core.ActiveMQSecurityException;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.BrokerInfo;
@ -74,19 +74,6 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.core.server.QueueQueryResult;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.protocol.openwire.amq.AMQBrokerStoppedException;
import org.apache.activemq.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
@ -101,10 +88,21 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStat
import org.apache.activemq.core.remoting.CloseListener;
import org.apache.activemq.core.remoting.FailureListener;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.utils.ConcurrentHashSet;
import org.apache.activemq.wireformat.WireFormat;
/**
* Represents an activemq connection.
@ -1403,12 +1401,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
if (producerExchange.canDispatch(messageSend))
{
if (messageSend.getDestination().isQueue())
{
SimpleString queueName = OpenWireUtil.toCoreAddress(messageSend.getDestination());
autoCreateQueueIfPossible(queueName, session);
}
SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
if (result.isBlockNextSend())
{
@ -1458,15 +1450,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
return resp;
}
public void autoCreateQueueIfPossible(SimpleString queueName, AMQSession session) throws Exception
{
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
if (result.isAutoCreateJmsQueues() && !result.isExists())
{
session.getCoreServer().createQueue(queueName, queueName, null, false, false, true);
}
}
private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException
{
AMQProducerBrokerExchange result = producerExchanges.get(id);

View File

@ -44,6 +44,7 @@ import org.apache.activemq.core.server.ActiveMQMessageBundle;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.QueueCreator;
import org.apache.activemq.core.server.ServerConsumer;
import org.apache.activemq.core.server.ServerMessage;
import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
@ -72,6 +73,7 @@ public class AMQServerSession extends ServerSessionImpl
SecurityStore securityStore, ManagementService managementService,
ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress,
SimpleString simpleString, SessionCallback callback,
QueueCreator queueCreator,
OperationContext context) throws Exception
{
super(name, username, password,
@ -83,7 +85,8 @@ public class AMQServerSession extends ServerSessionImpl
securityStore, managementService,
activeMQServerImpl, managementAddress,
simpleString, callback,
context, new AMQTransactionFactory());
context, new AMQTransactionFactory(),
queueCreator);
}
//create a fake session just for security check
@ -387,7 +390,7 @@ public class AMQServerSession extends ServerSessionImpl
try
{
postOffice.route(msg, routingContext, direct);
postOffice.route(msg, getQueueCreator(), routingContext, direct);
Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());

View File

@ -21,6 +21,7 @@ import org.apache.activemq.core.persistence.OperationContext;
import org.apache.activemq.core.persistence.StorageManager;
import org.apache.activemq.core.postoffice.PostOffice;
import org.apache.activemq.core.security.SecurityStore;
import org.apache.activemq.core.server.QueueCreator;
import org.apache.activemq.core.server.ServerSessionFactory;
import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.core.server.impl.ServerSessionImpl;
@ -41,13 +42,13 @@ public class AMQServerSessionFactory implements ServerSessionFactory
PostOffice postOffice, ResourceManager resourceManager,
SecurityStore securityStore, ManagementService managementService,
ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress,
SimpleString simpleString, SessionCallback callback,
SimpleString simpleString, SessionCallback callback, QueueCreator queueCreator,
OperationContext context) throws Exception
{
return new AMQServerSession(name, username, password, minLargeMessageSize, autoCommitSends,
autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa,
connection, storageManager, postOffice, resourceManager, securityStore,
managementService, activeMQServerImpl, managementAddress, simpleString, callback,
managementService, activeMQServerImpl, managementAddress, simpleString, callback, queueCreator,
context);
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.core.protocol.openwire.amq;
import javax.transaction.xa.Xid;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
@ -28,8 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.transaction.xa.Xid;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
@ -46,9 +46,6 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.core.protocol.openwire.OpenWireMessageConverter;
@ -56,12 +53,14 @@ import org.apache.activemq.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.core.protocol.openwire.OpenWireUtil;
import org.apache.activemq.core.protocol.openwire.SendingResult;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.ServerConsumer;
import org.apache.activemq.core.server.ServerMessage;
import org.apache.activemq.core.server.impl.ServerMessageImpl;
import org.apache.activemq.core.transaction.impl.XidImpl;
import org.apache.activemq.spi.core.protocol.SessionCallback;
import org.apache.activemq.spi.core.remoting.ReadyListener;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback
{
@ -109,7 +108,7 @@ public class AMQSession implements SessionCallback
{
coreSession = (AMQServerSession) server.createSession(name, username, password,
minLargeMessageSize, connection, true, false, false, false,
null, this, new AMQServerSessionFactory());
null, this, new AMQServerSessionFactory(), true);
long sessionId = sessInfo.getSessionId().getValue();
if (sessionId == -1)
@ -143,7 +142,7 @@ public class AMQSession implements SessionCallback
if (d.isQueue())
{
SimpleString queueName = OpenWireUtil.toCoreAddress(d);
connection.autoCreateQueueIfPossible(queueName, this);
getCoreServer().getJMSQueueCreator().create(queueName);
}
AMQConsumer consumer = new AMQConsumer(this, d, info);
consumer.init();

View File

@ -274,7 +274,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
false,
false,
null,
stompSession, null);
stompSession, null, true);
stompSession.setServerSession(session);
sessions.put(connection.getID(), stompSession);
}
@ -299,7 +299,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
false,
false,
null,
stompSession, null);
stompSession, null, true);
stompSession.setServerSession(session);
transactedSessions.put(txID, stompSession);
}

View File

@ -561,11 +561,11 @@ public class ActiveMQActivation
spec.getTransactionTimeout());
result.addMetaData("resource-adapter", "inbound");
result.addMetaData("jms-session", "");
result.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, "");
String clientID = ra.getClientID() == null ? spec.getClientID() : ra.getClientID();
if (clientID != null)
{
result.addMetaData("jms-client-id", clientID);
result.addMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
}
ActiveMQRALogger.LOGGER.debug("Using queue connection " + result);

View File

@ -24,6 +24,7 @@ import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.server.ActiveMQComponent;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.QueueCreator;
import org.apache.activemq.core.server.RoutingContext;
import org.apache.activemq.core.server.ServerMessage;
import org.apache.activemq.core.transaction.Transaction;
@ -66,15 +67,15 @@ public interface PostOffice extends ActiveMQComponent
Map<SimpleString, Binding> getAllBindings();
void route(ServerMessage message, boolean direct) throws Exception;
void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception;
void route(ServerMessage message, Transaction tx, boolean direct) throws Exception;
void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception;
void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception;
void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception;
void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception;
void route(ServerMessage message, QueueCreator queueCreator, RoutingContext context, boolean direct) throws Exception;
void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception;
void route(ServerMessage message, QueueCreator queueCreator, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception;
MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;

View File

@ -61,6 +61,7 @@ import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.LargeServerMessage;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.QueueCreator;
import org.apache.activemq.core.server.QueueFactory;
import org.apache.activemq.core.server.RouteContextList;
import org.apache.activemq.core.server.RoutingContext;
@ -81,6 +82,10 @@ import org.apache.activemq.core.transaction.impl.TransactionImpl;
import org.apache.activemq.utils.TypedProperties;
import org.apache.activemq.utils.UUIDGenerator;
/**
* This is the class that will make the routing to Queues and decide which consumer will get the messages
* It's the queue component on distributing the messages * *
*/
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory
{
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -605,30 +610,32 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return addressManager.getBindings();
}
public void route(final ServerMessage message, final boolean direct) throws Exception
public void route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception
{
route(message, (Transaction) null, direct);
route(message, queueCreator, (Transaction) null, direct);
}
public void route(final ServerMessage message, final Transaction tx, final boolean direct) throws Exception
public void route(final ServerMessage message, QueueCreator queueCreator, final Transaction tx, final boolean direct) throws Exception
{
route(message, new RoutingContextImpl(tx), direct);
route(message, queueCreator, new RoutingContextImpl(tx), direct);
}
public void route(final ServerMessage message,
final QueueCreator queueCreator,
final Transaction tx,
final boolean direct,
final boolean rejectDuplicates) throws Exception
{
route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates);
}
public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
public void route(final ServerMessage message, final QueueCreator queueCreator, final RoutingContext context, final boolean direct) throws Exception
{
route(message, context, direct, true);
route(message, queueCreator, context, direct, true);
}
public void route(final ServerMessage message,
final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception
@ -661,6 +668,18 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
// first check for the auto-queue creation thing
if (bindings == null && queueCreator != null)
{
// There is no queue with this address, we will check if it needs to be created
if (queueCreator.create(address))
{
// TODO: this is not working!!!!
// reassign bindings if it was created
bindings = addressManager.getBindingsForRoutingAddress(address);
}
}
if (bindings != null)
{
bindings.route(message, context);
@ -708,7 +727,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress);
route(message, context.getTransaction(), false);
route(message, null, context.getTransaction(), false);
}
}
else

View File

@ -177,7 +177,7 @@ public class ActiveMQPacketHandler implements ChannelHandler
request.getDefaultAddress(),
new CoreSessionCallback(request.getName(),
protocolManager,
channel), null);
channel), null, true);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
server.getStorageManager(),

View File

@ -111,7 +111,8 @@ public interface ActiveMQServer extends ActiveMQComponent
boolean xa,
String defaultAddress,
SessionCallback callback,
ServerSessionFactory sessionFactory) throws Exception;
ServerSessionFactory sessionFactory,
boolean autoCreateQueues) throws Exception;
SecurityStore getSecurityStore();
@ -142,6 +143,19 @@ public interface ActiveMQServer extends ActiveMQComponent
boolean isActive();
/**
* This is the queue creator responsible for JMS Queue creations*
* @param queueCreator
*/
void setJMSQueueCreator(QueueCreator queueCreator);
/**
* @see {@link org.apache.activemq.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator)} *
* *
* @return
*/
QueueCreator getJMSQueueCreator();
/**
* Wait for server initialization.
* @param timeout

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.server;
import org.apache.activemq.api.core.SimpleString;
public interface QueueCreator
{
/**
*
* You should return true if you even tried to create the queue and the queue was already there.
* As the callers of this method will use that as an indicator that they should re-route the messages.
* *
* @return True if a queue was created.
*/
boolean create(SimpleString address) throws Exception;
}

View File

@ -75,6 +75,8 @@ public interface ServerSession
void xaSuspend() throws Exception;
QueueCreator getQueueCreator();
List<Xid> xaGetInDoubtXids();
int xaGetTimeout();

View File

@ -40,6 +40,6 @@ public interface ServerSessionFactory
SecurityStore securityStore, ManagementService managementService,
ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress,
SimpleString simpleString, SessionCallback callback,
OperationContext context) throws Exception;
QueueCreator queueCreator, OperationContext context) throws Exception;
}

View File

@ -99,6 +99,7 @@ import org.apache.activemq.core.server.LargeServerMessage;
import org.apache.activemq.core.server.MemoryManager;
import org.apache.activemq.core.server.NodeManager;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.QueueCreator;
import org.apache.activemq.core.server.QueueFactory;
import org.apache.activemq.core.server.ServerSession;
import org.apache.activemq.core.server.ServerSessionFactory;
@ -222,6 +223,11 @@ public class ActiveMQServerImpl implements ActiveMQServer
private MemoryManager memoryManager;
/**
* This will be set by the JMS Queue Manager.
*/
private QueueCreator jmsQueueCreator;
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
/**
@ -593,6 +599,18 @@ public class ActiveMQServerImpl implements ActiveMQServer
stop(failoverOnServerShutdown, false, false);
}
@Override
public QueueCreator getJMSQueueCreator()
{
return jmsQueueCreator;
}
@Override
public void setJMSQueueCreator(QueueCreator jmsQueueCreator)
{
this.jmsQueueCreator = jmsQueueCreator;
}
/**
* Stops the server
* @param criticalIOError whether we have encountered an IO error with the journal etc
@ -1007,6 +1025,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
return backupManager;
}
@Override
public ServerSession createSession(final String name,
final String username,
final String password,
@ -1018,7 +1037,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
final boolean xa,
final String defaultAddress,
final SessionCallback callback,
final ServerSessionFactory sessionFactory) throws Exception
final ServerSessionFactory sessionFactory,
final boolean autoCreateQueues) throws Exception
{
if (securityStore != null)
@ -1026,14 +1046,22 @@ public class ActiveMQServerImpl implements ActiveMQServer
securityStore.authenticate(username, password);
}
final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory);
final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize,
connection, autoCommitSends, autoCommitAcks, preAcknowledge,
xa, defaultAddress, callback, context, sessionFactory, autoCreateQueues);
sessions.put(name, session);
return session;
}
protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception
protected ServerSessionImpl internalCreateSession(String name, String username,
String password, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends,
boolean autoCommitAcks, boolean preAcknowledge, boolean xa,
String defaultAddress, SessionCallback callback,
OperationContext context, ServerSessionFactory sessionFactory,
boolean autoCreateJMSQueues) throws Exception
{
if (sessionFactory == null)
{
@ -1057,7 +1085,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
defaultAddress == null ? null
: new SimpleString(defaultAddress),
callback,
context);
context,
autoCreateJMSQueues ? jmsQueueCreator : null);
}
else
{
@ -1081,6 +1110,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
defaultAddress == null ? null
: new SimpleString(defaultAddress),
callback,
jmsQueueCreator,
context);
}
}

View File

@ -110,7 +110,7 @@ public class DivertImpl implements Divert
copy = message;
}
postOffice.route(copy, context.getTransaction(), false);
postOffice.route(copy, null, context.getTransaction(), false);
}
@Override

View File

@ -2449,7 +2449,7 @@ public class QueueImpl implements Queue
copyMessage.setAddress(toAddress);
postOffice.route(copyMessage, tx, false, rejectDuplicate);
postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
acknowledge(tx, ref);
}
@ -2673,7 +2673,7 @@ public class QueueImpl implements Queue
copyMessage.setAddress(address);
postOffice.route(copyMessage, tx, false, rejectDuplicate);
postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
acknowledge(tx, ref);

View File

@ -35,6 +35,7 @@ import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.api.core.Message;
import org.apache.activemq.api.core.Pair;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.management.CoreNotificationType;
import org.apache.activemq.api.core.management.ManagementHelper;
import org.apache.activemq.api.core.management.ResourceNames;
@ -63,6 +64,7 @@ import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.LargeServerMessage;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.QueueCreator;
import org.apache.activemq.core.server.QueueQueryResult;
import org.apache.activemq.core.server.RoutingContext;
import org.apache.activemq.core.server.ServerConsumer;
@ -154,6 +156,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
private final OperationContext context;
private QueueCreator queueCreator;
// Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
protected final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
@ -169,8 +173,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener
private final TransactionFactory transactionFactory;
// Constructors ---------------------------------------------------------------------------------
//create an 'empty' session. Only used by AMQServerSession
//in order to check username and password
protected ServerSessionImpl(String username, String password)
@ -193,35 +195,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
this.managementAddress = null;
this.context = null;
this.callback = null;
}
public ServerSessionImpl(final String name,
final String username,
final String password,
final int minLargeMessageSize,
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final boolean xa,
final RemotingConnection remotingConnection,
final StorageManager storageManager,
final PostOffice postOffice,
final ResourceManager resourceManager,
final SecurityStore securityStore,
final ManagementService managementService,
final ActiveMQServer server,
final SimpleString managementAddress,
final SimpleString defaultAddress,
final SessionCallback callback,
final OperationContext context) throws Exception
{
this(name, username, password, minLargeMessageSize,
autoCommitSends, autoCommitAcks, preAcknowledge,
strictUpdateDeliveryCount, xa, remotingConnection,
storageManager, postOffice, resourceManager, securityStore,
managementService, server, managementAddress, defaultAddress,
callback, context, null);
this.queueCreator = null;
}
public ServerSessionImpl(final String name,
@ -244,7 +218,38 @@ public class ServerSessionImpl implements ServerSession, FailureListener
final SimpleString defaultAddress,
final SessionCallback callback,
final OperationContext context,
TransactionFactory transactionFactory) throws Exception
final QueueCreator queueCreator) throws Exception
{
this(name, username, password, minLargeMessageSize,
autoCommitSends, autoCommitAcks, preAcknowledge,
strictUpdateDeliveryCount, xa, remotingConnection,
storageManager, postOffice, resourceManager, securityStore,
managementService, server, managementAddress, defaultAddress,
callback, context, null, queueCreator);
}
public ServerSessionImpl(final String name,
final String username,
final String password,
final int minLargeMessageSize,
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final boolean xa,
final RemotingConnection remotingConnection,
final StorageManager storageManager,
final PostOffice postOffice,
final ResourceManager resourceManager,
final SecurityStore securityStore,
final ManagementService managementService,
final ActiveMQServer server,
final SimpleString managementAddress,
final SimpleString defaultAddress,
final SessionCallback callback,
final OperationContext context,
TransactionFactory transactionFactory,
final QueueCreator queueCreator) throws Exception
{
this.username = username;
@ -288,6 +293,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
remotingConnection.addFailureListener(this);
this.context = context;
this.queueCreator = queueCreator;
if (transactionFactory == null)
{
this.transactionFactory = new DefaultTransactionFactory();
@ -421,6 +428,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener
}
}
public QueueCreator getQueueCreator()
{
return queueCreator;
}
public ServerConsumer createConsumer(final long consumerID,
final SimpleString queueName,
final SimpleString filterString,
@ -1596,6 +1610,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
data = metaData.get(key);
}
if (key.equals(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY))
{
// we know it's a JMS Session, we now install JMS Hooks of any kind
installJMSHooks();
}
return data;
}
@ -1709,16 +1729,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener
connectionFailed(me, failedOver);
}
// Public
// ----------------------------------------------------------------------------
public void clearLargeMessage()
{
currentLargeMessage = null;
}
// Private
// ----------------------------------------------------------------------------
private void installJMSHooks()
{
this.queueCreator = server.getJMSQueueCreator();
}
private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses()
{
@ -1846,7 +1868,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
try
{
postOffice.route(msg, routingContext, direct);
postOffice.route(msg, queueCreator, routingContext, direct);
Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());

View File

@ -728,7 +728,7 @@ public class ManagementServiceImpl implements ManagementService
new SimpleString(notification.getUID()));
}
postOffice.route(notificationMessage, false);
postOffice.route(notificationMessage, null, false);
}
}
}

View File

@ -175,7 +175,7 @@ public class IncomingVertxEventHandler implements ConnectorService
try
{
postOffice.route(msg, false);
postOffice.route(msg, null, false);
}
catch (Exception e)
{

View File

@ -72,6 +72,38 @@ public class AutoCreateJmsQueueTest extends JMSTestBase
connection.close();
}
@Test
public void testAutoCreateOnSendToQueueAnonymousProducer() throws Exception
{
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
MessageProducer producer = session.createProducer(null);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++)
{
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(queue, mess);
}
producer.close();
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < numMessages; i++)
{
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
}
connection.close();
}
@Test
public void testAutoCreateOnSendToQueueSecurity() throws Exception
{

View File

@ -661,7 +661,7 @@ public class HangConsumerTest extends ServiceTestBase
}
@Override
protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception
protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory, boolean autoCreateQueue) throws Exception
{
return new ServerSessionImpl(name,
username,
@ -683,7 +683,8 @@ public class HangConsumerTest extends ServiceTestBase
defaultAddress == null ? null
: new SimpleString(defaultAddress),
new MyCallback(callback),
context);
context,
null);
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.tests.integration.openwire;
import javax.jms.ConnectionFactory;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -23,10 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.ConnectionFactory;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.jms.management.JMSServerControl;
@ -75,6 +74,7 @@ public class OpenWireTestBase extends ServiceTestBase
Map<String, AddressSettings> addressSettings = serverConfig.getAddressesSettings();
String match = "jms.queue.#";
AddressSettings dlaSettings = new AddressSettings();
dlaSettings.setAutoCreateJmsQueues(false);
SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
dlaSettings.setDeadLetterAddress(dla);
addressSettings.put(match, dlaSettings);

View File

@ -32,6 +32,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -293,6 +294,32 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
assertTrue(message1.getText().equals(message.getText()));
}
@Test
public void testAutoDestinationNoCreationOnConsumer() throws JMSException
{
AddressSettings addressSetting = new AddressSettings();
addressSetting.setAutoCreateJmsQueues(false);
String address = "foo";
server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TextMessage message = session.createTextMessage("bar");
Queue queue = new ActiveMQQueue(address);
try
{
MessageConsumer consumer = session.createConsumer(queue);
Assert.fail("supposed to throw an exception here");
}
catch (JMSException e)
{
}
}
/**
* This is the example shipped with the distribution
*

View File

@ -16,26 +16,25 @@
*/
package org.apache.activemq.tests.integration.openwire.amq;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

View File

@ -29,6 +29,7 @@ import org.apache.activemq.core.postoffice.PostOffice;
import org.apache.activemq.core.postoffice.impl.DuplicateIDCacheImpl;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.QueueCreator;
import org.apache.activemq.core.server.RoutingContext;
import org.apache.activemq.core.server.ServerMessage;
import org.apache.activemq.core.server.impl.MessageReferenceImpl;
@ -153,45 +154,27 @@ public class FakePostOffice implements PostOffice
return new MessageReferenceImpl();
}
public void route(final ServerMessage message, final Transaction tx) throws Exception
public void route(ServerMessage message, QueueCreator creator, RoutingContext context, boolean direct) throws Exception
{
}
public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
}
public void route(ServerMessage message, boolean direct) throws Exception
{
}
public void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception
{
}
public void route(ServerMessage message, Transaction tx, boolean direct) throws Exception
public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception
{
}
@Override
public void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception
public void route(ServerMessage message, QueueCreator creator, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception
{
}
@Override
public void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception
public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception
{
@ -200,8 +183,11 @@ public class FakePostOffice implements PostOffice
@Override
public void processRoute(ServerMessage message, RoutingContext context, boolean direct) throws Exception
{
}
@Override
public void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception
{
}
}