ACTIVEMQ6-13 auto-create/auto-delete jms queues

Implements a new feature for the broker whereby it may automatically
create and delete JMS queues which are not explicitly defined through
the management API or file-based configuration. A JMS queue is created
in response to a sent message or connected consumer. The queue may
subsequently be deleted when it no longer has any messages and
consumers. Auto-creation and auto-deletion can both be turned on/off
via address-setting.
This commit is contained in:
jbertram 2015-01-06 13:34:38 -06:00 committed by Justin Bertram
parent b388bf2397
commit e293d80f08
59 changed files with 1228 additions and 437 deletions

View File

@ -48,6 +48,12 @@ public interface ClientSession extends XAResource, AutoCloseable
* Returns the names of the queues bound to the binding.
*/
List<SimpleString> getQueueNames();
/**
* Returns <code>true</code> if auto-creation for this address is enabled and if the address queried is for a JMS
* queue, <code>false</code> else.
*/
boolean isAutoCreateJmsQueues();
}
/**
@ -81,6 +87,12 @@ public interface ClientSession extends XAResource, AutoCloseable
*/
boolean isDurable();
/**
* Returns <code>true</code> if auto-creation for this queue is enabled and if the queue queried is a JMS queue,
* <code>false</code> else.
*/
boolean isAutoCreateJmsQueues();
/**
* Returns the number of consumers attached to the queue.
*/

View File

@ -565,7 +565,9 @@ public interface ActiveMQServerControl
@Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
@Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy) throws Exception;
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception;
void removeAddressSettings(String addressMatch) throws Exception;

View File

@ -60,6 +60,10 @@ public final class AddressSettingsInfo
private final String slowConsumerPolicy;
private final boolean autoCreateJmsQueues;
private final boolean autoDeleteJmsQueues;
// Static --------------------------------------------------------
public static AddressSettingsInfo from(final String jsonString) throws Exception
@ -80,7 +84,9 @@ public final class AddressSettingsInfo
object.getBoolean("sendToDLAOnNoRoute"),
object.getLong("slowConsumerThreshold"),
object.getLong("slowConsumerCheckPeriod"),
object.getString("slowConsumerPolicy"));
object.getString("slowConsumerPolicy"),
object.getBoolean("autoCreateJmsQueues"),
object.getBoolean("autoDeleteJmsQueues"));
}
// Constructors --------------------------------------------------
@ -100,7 +106,9 @@ public final class AddressSettingsInfo
boolean sendToDLAOnNoRoute,
long slowConsumerThreshold,
long slowConsumerCheckPeriod,
String slowConsumerPolicy)
String slowConsumerPolicy,
boolean autoCreateJmsQueues,
boolean autoDeleteJmsQueues)
{
this.addressFullMessagePolicy = addressFullMessagePolicy;
this.maxSizeBytes = maxSizeBytes;
@ -118,6 +126,8 @@ public final class AddressSettingsInfo
this.slowConsumerThreshold = slowConsumerThreshold;
this.slowConsumerCheckPeriod = slowConsumerCheckPeriod;
this.slowConsumerPolicy = slowConsumerPolicy;
this.autoCreateJmsQueues = autoCreateJmsQueues;
this.autoDeleteJmsQueues = autoDeleteJmsQueues;
}
// Public --------------------------------------------------------
@ -206,5 +216,15 @@ public final class AddressSettingsInfo
{
return slowConsumerPolicy;
}
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
public boolean isAutoDeleteJmsQueues()
{
return autoDeleteJmsQueues;
}
}

View File

@ -24,15 +24,17 @@ import org.apache.activemq.api.core.client.ClientSession;
public class AddressQueryImpl implements ClientSession.AddressQuery, ClientSession.BindingQuery
{
private final boolean exists;
private final ArrayList<SimpleString> queueNames;
public AddressQueryImpl(final boolean exists, final List<SimpleString> queueNames)
private final boolean autoCreateJmsQueues;
public AddressQueryImpl(final boolean exists, final List<SimpleString> queueNames, final boolean autoCreateJmsQueues)
{
this.exists = exists;
this.queueNames = new ArrayList<SimpleString>(queueNames);
this.autoCreateJmsQueues = autoCreateJmsQueues;
}
public List<SimpleString> getQueueNames()
@ -44,4 +46,9 @@ public class AddressQueryImpl implements ClientSession.AddressQuery, ClientSessi
{
return exists;
}
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
}

View File

@ -38,6 +38,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
private final SimpleString name;
private final boolean autoCreateJmsQueues;
public QueueQueryImpl(final boolean durable,
final boolean temporary,
final int consumerCount,
@ -47,7 +49,19 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
final SimpleString name,
final boolean exists)
{
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, false);
}
public QueueQueryImpl(final boolean durable,
final boolean temporary,
final int consumerCount,
final long messageCount,
final SimpleString filterString,
final SimpleString address,
final SimpleString name,
final boolean exists,
final boolean autoCreateJmsQueues)
{
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@ -56,6 +70,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
this.address = address;
this.name = name;
this.exists = exists;
this.autoCreateJmsQueues = autoCreateJmsQueues;
}
public SimpleString getName()
@ -88,6 +103,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
return durable;
}
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
public boolean isTemporary()
{
return temporary;

View File

@ -60,7 +60,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
@ -72,7 +72,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionIndividualA
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
@ -231,13 +231,11 @@ public class ActiveMQSessionContext extends SessionContext
public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException
{
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
SessionQueueQueryResponseMessage_V2 response = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
return response.toQueueQuery();
}
public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString,
int windowSize, int maxRate, int ackBatchSize, boolean browseOnly,
Executor executor, Executor flowControlExecutor) throws ActiveMQException
@ -252,7 +250,7 @@ public class ActiveMQSessionContext extends SessionContext
browseOnly,
true);
SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
SessionQueueQueryResponseMessage_V2 queueInfo = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
@ -283,10 +281,10 @@ public class ActiveMQSessionContext extends SessionContext
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException
{
SessionBindingQueryResponseMessage response =
(SessionBindingQueryResponseMessage) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
SessionBindingQueryResponseMessage_V2 response =
(SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
return new AddressQueryImpl(response.isExists(), response.getQueueNames());
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues());
}

View File

@ -161,6 +161,10 @@ public final class ChannelImpl implements Channel
return version >= 125;
case PacketImpl.DISCONNECT_V2:
return version >= 125;
case PacketImpl.SESS_QUEUEQUERY_RESP_V2:
return version >= 126;
case PacketImpl.SESS_BINDINGQUERY_RESP_V2:
return version >= 126;
default:
return true;
}

View File

@ -39,6 +39,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ADD_ME
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
@ -52,6 +53,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_PRODUC
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V2;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
@ -107,6 +109,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaData
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCommitMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
@ -120,6 +123,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCre
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
@ -251,6 +255,11 @@ public abstract class PacketDecoder implements Serializable
packet = new SessionQueueQueryResponseMessage();
break;
}
case SESS_QUEUEQUERY_RESP_V2:
{
packet = new SessionQueueQueryResponseMessage_V2();
break;
}
case CREATE_QUEUE:
{
packet = new CreateQueueMessage();
@ -276,6 +285,11 @@ public abstract class PacketDecoder implements Serializable
packet = new SessionBindingQueryResponseMessage();
break;
}
case SESS_BINDINGQUERY_RESP_V2:
{
packet = new SessionBindingQueryResponseMessage_V2();
break;
}
case SESS_XA_START:
{
packet = new SessionXAStartMessage();

View File

@ -246,6 +246,10 @@ public class PacketImpl implements Packet
public static final byte SCALEDOWN_ANNOUNCEMENT = -6;
public static final byte SESS_QUEUEQUERY_RESP_V2 = -7;
public static final byte SESS_BINDINGQUERY_RESP_V2 = -8;
// Static --------------------------------------------------------
public PacketImpl(final byte type)

View File

@ -32,9 +32,9 @@ import org.apache.activemq.core.protocol.core.impl.PacketImpl;
*/
public class SessionBindingQueryResponseMessage extends PacketImpl
{
private boolean exists;
protected boolean exists;
private List<SimpleString> queueNames;
protected List<SimpleString> queueNames;
public SessionBindingQueryResponseMessage(final boolean exists, final List<SimpleString> queueNames)
{
@ -50,6 +50,11 @@ public class SessionBindingQueryResponseMessage extends PacketImpl
super(SESS_BINDINGQUERY_RESP);
}
public SessionBindingQueryResponseMessage(byte v2)
{
super(v2);
}
@Override
public boolean isResponse()
{

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.protocol.core.impl.wireformat;
import java.util.List;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.SimpleString;
/**
* @author Justin Bertram
*
*/
public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryResponseMessage
{
private boolean autoCreateJmsQueues;
public SessionBindingQueryResponseMessage_V2(final boolean exists, final List<SimpleString> queueNames, final boolean autoCreateJmsQueues)
{
super(SESS_BINDINGQUERY_RESP_V2);
this.exists = exists;
this.queueNames = queueNames;
this.autoCreateJmsQueues = autoCreateJmsQueues;
}
public SessionBindingQueryResponseMessage_V2()
{
super(SESS_BINDINGQUERY_RESP_V2);
}
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer)
{
super.encodeRest(buffer);
buffer.writeBoolean(autoCreateJmsQueues);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer)
{
super.decodeRest(buffer);
autoCreateJmsQueues = buffer.readBoolean();
}
@Override
public int hashCode()
{
final int prime = 31;
int result = super.hashCode();
result = prime * result + (autoCreateJmsQueues ? 1231 : 1237);
return result;
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionBindingQueryResponseMessage_V2))
return false;
SessionBindingQueryResponseMessage_V2 other = (SessionBindingQueryResponseMessage_V2)obj;
if (autoCreateJmsQueues != other.autoCreateJmsQueues)
return false;
return true;
}
}

View File

@ -32,21 +32,21 @@ import org.apache.activemq.core.server.QueueQueryResult;
*/
public class SessionQueueQueryResponseMessage extends PacketImpl
{
private SimpleString name;
protected SimpleString name;
private boolean exists;
protected boolean exists;
private boolean durable;
protected boolean durable;
private int consumerCount;
protected int consumerCount;
private long messageCount;
protected long messageCount;
private SimpleString filterString;
protected SimpleString filterString;
private SimpleString address;
protected SimpleString address;
private boolean temporary;
protected boolean temporary;
public SessionQueueQueryResponseMessage(final QueueQueryResult result)
{
@ -59,6 +59,11 @@ public class SessionQueueQueryResponseMessage extends PacketImpl
this(null, null, false, false, null, 0, 0, false);
}
public SessionQueueQueryResponseMessage(byte v2)
{
super(v2);
}
private SessionQueueQueryResponseMessage(final SimpleString name,
final SimpleString address,
final boolean durable,

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.protocol.core.impl.wireformat;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.core.client.impl.QueueQueryImpl;
import org.apache.activemq.core.server.QueueQueryResult;
/**
* @author Justin Bertram
*
*/
public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryResponseMessage
{
private boolean autoCreationEnabled;
public SessionQueueQueryResponseMessage_V2(final QueueQueryResult result)
{
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(),
result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateJmsQueues());
}
public SessionQueueQueryResponseMessage_V2()
{
this(null, null, false, false, null, 0, 0, false, false);
}
private SessionQueueQueryResponseMessage_V2(final SimpleString name,
final SimpleString address,
final boolean durable,
final boolean temporary,
final SimpleString filterString,
final int consumerCount,
final long messageCount,
final boolean exists,
final boolean autoCreationEnabled)
{
super(SESS_QUEUEQUERY_RESP_V2);
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
this.messageCount = messageCount;
this.filterString = filterString;
this.address = address;
this.name = name;
this.exists = exists;
this.autoCreationEnabled = autoCreationEnabled;
}
public boolean isAutoCreationEnabled()
{
return autoCreationEnabled;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer)
{
super.encodeRest(buffer);
buffer.writeBoolean(autoCreationEnabled);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer)
{
super.decodeRest(buffer);
autoCreationEnabled = buffer.readBoolean();
}
@Override
public int hashCode()
{
final int prime = 31;
int result = super.hashCode();
result = prime * result + (autoCreationEnabled ? 1231 : 1237);
return result;
}
public ClientSession.QueueQuery toQueueQuery()
{
return new QueueQueryImpl(isDurable(),
isTemporary(),
getConsumerCount(),
getMessageCount(),
getFilterString(),
getAddress(),
getName(),
isExists(),
isAutoCreationEnabled());
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionQueueQueryResponseMessage_V2))
return false;
SessionQueueQueryResponseMessage_V2 other = (SessionQueueQueryResponseMessage_V2)obj;
if (autoCreationEnabled != other.autoCreationEnabled)
return false;
return true;
}
}

View File

@ -43,29 +43,28 @@ public class QueueQueryResult
private boolean temporary;
private boolean autoCreateJmsQueues;
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
final boolean temporary,
final SimpleString filterString,
final int consumerCount,
final long messageCount)
final long messageCount,
final boolean autoCreateJmsQueues)
{
this(name, address, durable, temporary, filterString, consumerCount, messageCount, true);
this(name, address, durable, temporary, filterString, consumerCount, messageCount, autoCreateJmsQueues, true);
}
public QueueQueryResult()
{
this(null, null, false, false, null, 0, 0, false);
}
private QueueQueryResult(final SimpleString name,
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
final boolean temporary,
final SimpleString filterString,
final int consumerCount,
final long messageCount,
final boolean autoCreateJmsQueues,
final boolean exists)
{
this.durable = durable;
@ -82,6 +81,8 @@ public class QueueQueryResult
this.name = name;
this.autoCreateJmsQueues = autoCreateJmsQueues;
this.exists = exists;
}
@ -125,4 +126,9 @@ public class QueueQueryResult
return temporary;
}
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
}

View File

@ -21,4 +21,4 @@ activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionSuffix=${activemq.version.versionSuffix}
activemq.version.versionTag=${activemq.version.versionTag}
activemq.version.compatibleVersionList=121,122,123,124,125
activemq.version.compatibleVersionList=121,122,123,124,125,126

View File

@ -415,7 +415,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
ClientSession.AddressQuery query = clientSession.addressQuery(address);
if (!query.isExists())
{
throw new InvalidDestinationException("Destination " + address + " does not exist");
if (query.isAutoCreateJmsQueues())
{
clientSession.createQueue(address, address, true);
}
else
{
throw new InvalidDestinationException("Destination " + address + " does not exist");
}
}
else
{

View File

@ -326,7 +326,14 @@ public class ActiveMQSession implements QueueSession, TopicSession
if (!response.isExists())
{
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
if (response.isAutoCreateJmsQueues())
{
session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), true);
}
else
{
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
}
}
connection.addKnownDestination(jbd.getSimpleAddress());
@ -730,7 +737,14 @@ public class ActiveMQSession implements QueueSession, TopicSession
if (!response.isExists())
{
throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist");
if (response.isAutoCreateJmsQueues())
{
session.createQueue(dest.getSimpleAddress(), dest.getSimpleAddress(), true);
}
else
{
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
}
}
connection.addKnownDestination(dest.getSimpleAddress());
@ -902,10 +916,17 @@ public class ActiveMQSession implements QueueSession, TopicSession
try
{
AddressQuery message = session.addressQuery(new SimpleString(jbq.getAddress()));
if (!message.isExists())
AddressQuery response = session.addressQuery(new SimpleString(jbq.getAddress()));
if (!response.isExists())
{
throw new InvalidDestinationException(jbq.getAddress() + " does not exist");
if (response.isAutoCreateJmsQueues())
{
session.createQueue(jbq.getSimpleAddress(), jbq.getSimpleAddress(), true);
}
else
{
throw new InvalidDestinationException("Destination " + jbq.getName() + " does not exist");
}
}
}
catch (ActiveMQException e)
@ -1239,13 +1260,13 @@ public class ActiveMQSession implements QueueSession, TopicSession
QueueQuery response = session.queueQuery(queue.getSimpleAddress());
if (response.isExists())
if (!response.isExists() && !response.isAutoCreateJmsQueues())
{
return queue;
return null;
}
else
{
return null;
return queue;
}
}

View File

@ -152,6 +152,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
private static final String SLOW_CONSUMER_POLICY_NODE_NAME = "slow-consumer-policy";
private static final String AUTO_CREATE_JMS_QUEUES = "auto-create-jms-queues";
private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues";
// Attributes ----------------------------------------------------
private boolean validateAIO = false;
@ -1139,6 +1143,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
SlowConsumerPolicy policy = Enum.valueOf(SlowConsumerPolicy.class, value);
addressSettings.setSlowConsumerPolicy(policy);
}
else if (AUTO_CREATE_JMS_QUEUES.equalsIgnoreCase(name))
{
addressSettings.setAutoCreateJmsQueues(XMLUtil.parseBoolean(child));
}
else if (AUTO_DELETE_JMS_QUEUES.equalsIgnoreCase(name))
{
addressSettings.setAutoDeleteJmsQueues(XMLUtil.parseBoolean(child));
}
}
return setting;
}

View File

@ -1635,6 +1635,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
policy = addressSettings.getSlowConsumerPolicy() == SlowConsumerPolicy.NOTIFY ? "NOTIFY"
: "KILL";
settings.put("slowConsumerPolicy", policy);
settings.put("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues());
settings.put("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues());
JSONObject jsonObject = new JSONObject(settings);
return jsonObject.toString();
@ -1658,7 +1660,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final String addressFullMessagePolicy,
final long slowConsumerThreshold,
final long slowConsumerCheckPeriod,
final String slowConsumerPolicy) throws Exception
final String slowConsumerPolicy,
final boolean autoCreateJmsQueues,
final boolean autoDeleteJmsQueues) throws Exception
{
checkStarted();
@ -1721,6 +1725,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
{
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
}
addressSettings.setAutoCreateJmsQueues(autoCreateJmsQueues);
addressSettings.setAutoDeleteJmsQueues(autoDeleteJmsQueues);
server.getAddressSettingsRepository().addMatch(address, addressSettings);
storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings));

View File

@ -43,4 +43,6 @@ public interface QueueBindingInfo
SimpleString getFilterString();
boolean isAutoCreated();
}

View File

@ -2014,7 +2014,8 @@ public class JournalStorageManager implements StorageManager
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
binding.getAddress(),
filterString);
filterString,
queue.isAutoCreated());
readLock();
try
@ -3027,6 +3028,8 @@ public class JournalStorageManager implements StorageManager
public SimpleString filterString;
public boolean autoCreated;
public PersistentQueueBindingEncoding()
{
}
@ -3041,16 +3044,20 @@ public class JournalStorageManager implements StorageManager
address +
", filterString=" +
filterString +
", autoCreated=" +
autoCreated +
"]";
}
public PersistentQueueBindingEncoding(final SimpleString name,
final SimpleString address,
final SimpleString filterString)
final SimpleString filterString,
final boolean autoCreated)
{
this.name = name;
this.address = address;
this.filterString = filterString;
this.autoCreated = autoCreated;
}
public long getId()
@ -3083,11 +3090,17 @@ public class JournalStorageManager implements StorageManager
return name;
}
public boolean isAutoCreated()
{
return autoCreated;
}
public void decode(final ActiveMQBuffer buffer)
{
name = buffer.readSimpleString();
address = buffer.readSimpleString();
filterString = buffer.readNullableSimpleString();
autoCreated = buffer.readBoolean();
}
public void encode(final ActiveMQBuffer buffer)
@ -3095,12 +3108,13 @@ public class JournalStorageManager implements StorageManager
buffer.writeSimpleString(name);
buffer.writeSimpleString(address);
buffer.writeNullableSimpleString(filterString);
buffer.writeBoolean(autoCreated);
}
public int getEncodeSize()
{
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
SimpleString.sizeofNullableString(filterString);
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN;
}
}

View File

@ -72,6 +72,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaData
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
@ -81,6 +82,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionForceConsum
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
@ -230,7 +232,15 @@ public class ServerSessionPacketHandler implements ChannelHandler
{
// We send back queue information on the queue as a response- this allows the queue to
// be automatically recreated on failover
response = new SessionQueueQueryResponseMessage(session.executeQueueQuery(request.getQueueName()));
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2))
{
response = new SessionQueueQueryResponseMessage_V2(queueQueryResult);
}
else
{
response = new SessionQueueQueryResponseMessage(queueQueryResult);
}
}
break;
@ -277,7 +287,14 @@ public class ServerSessionPacketHandler implements ChannelHandler
requiresResponse = true;
SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
response = new SessionQueueQueryResponseMessage(result);
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2))
{
response = new SessionQueueQueryResponseMessage_V2(result);
}
else
{
response = new SessionQueueQueryResponseMessage(result);
}
break;
}
case SESS_BINDINGQUERY:
@ -285,7 +302,14 @@ public class ServerSessionPacketHandler implements ChannelHandler
requiresResponse = true;
SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
BindingQueryResult result = session.executeBindingQuery(request.getAddress());
response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2))
{
response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues());
}
else
{
response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
}
break;
}
case SESS_ACKNOWLEDGE:

View File

@ -177,6 +177,13 @@ public interface ActiveMQServer extends ActiveMQComponent
boolean durable,
boolean temporary) throws Exception;
Queue createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
boolean durable,
boolean temporary,
boolean autoCreated) throws Exception;
Queue deployQueue(SimpleString address,
SimpleString queueName,
SimpleString filterString,

View File

@ -1366,4 +1366,8 @@ public interface ActiveMQServerLogger extends BasicLogger
@Message(id = 224064, value = "Setting <{0}> is invalid with this HA Policy Configuration. Please use <ha-policy> exclusively or remove. Ignoring <{0}> value.", format = Message.Format.MESSAGE_FORMAT)
void incompatibleWithHAPolicyChosen(String parameter);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224065, value = "Failed to remove auto-created queue {0}", format = Message.Format.MESSAGE_FORMAT)
void errorRemovingAutoCreatedQueue(@Cause Exception e, SimpleString bindingName);
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.server;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.utils.ReferenceCounter;
/**
* @author Clebert Suconic
*/
public interface AutoCreatedQueueManager extends ReferenceCounter
{
SimpleString getQueueName();
}

View File

@ -33,11 +33,15 @@ public class BindingQueryResult
private List<SimpleString> queueNames;
public BindingQueryResult(final boolean exists, final List<SimpleString> queueNames)
private boolean autoCreateJmsQueues;
public BindingQueryResult(final boolean exists, final List<SimpleString> queueNames, final boolean autoCreateJmsQueues)
{
this.exists = exists;
this.queueNames = queueNames;
this.autoCreateJmsQueues = autoCreateJmsQueues;
}
public boolean isExists()
@ -45,6 +49,11 @@ public class BindingQueryResult
return exists;
}
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
public List<SimpleString> getQueueNames()
{
return queueNames;

View File

@ -49,6 +49,8 @@ public interface Queue extends Bindable
boolean isTemporary();
boolean isAutoCreated();
void addConsumer(Consumer consumer) throws Exception;
void removeConsumer(Consumer consumer);
@ -62,7 +64,7 @@ public interface Queue extends Bindable
* on shared subscriptions where the queue needs to be deleted when all the
* consumers are closed.
*/
void setConsumersRefCount(ActiveMQServer server);
void setConsumersRefCount(ReferenceCounter referenceCounter);
ReferenceCounter getConsumersRefCount();

View File

@ -39,7 +39,8 @@ public interface QueueFactory
Filter filter,
PageSubscription pageSubscription,
boolean durable,
boolean temporary);
boolean temporary,
boolean autoCreated);
/**
* This is required for delete-all-reference to work correctly with paging

View File

@ -1193,9 +1193,18 @@ public class ActiveMQServerImpl implements ActiveMQServer
final boolean durable,
final boolean temporary) throws Exception
{
return createQueue(address, queueName, filterString, durable, temporary, false, false);
return createQueue(address, queueName, filterString, durable, temporary, false, false, false);
}
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception
{
return createQueue(address, queueName, filterString, durable, temporary, false, false, autoCreated);
}
/**
* Creates a transient queue. A queue that will exist as long as there are consumers.
@ -1214,7 +1223,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
final SimpleString filterString,
boolean durable) throws Exception
{
Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable);
Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable, false);
if (!queue.getAddress().equals(address))
{
@ -1263,7 +1272,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
return createQueue(address, queueName, filterString, durable, temporary, true, false);
return createQueue(address, queueName, filterString, durable, temporary, true, false, false);
}
public void destroyQueue(final SimpleString queueName) throws Exception
@ -1981,7 +1990,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
final boolean durable,
final boolean temporary,
final boolean ignoreIfExists,
final boolean transientQueue) throws Exception
final boolean transientQueue,
final boolean autoCreated) throws Exception
{
QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
@ -2021,11 +2031,16 @@ public class ActiveMQServerImpl implements ActiveMQServer
filter,
pageSubscription,
durable,
temporary);
temporary,
autoCreated);
if (transientQueue)
{
queue.setConsumersRefCount(this);
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName));
}
else if (autoCreated)
{
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queueName));
}
binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId());

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.server.impl;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.AutoCreatedQueueManager;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.utils.ReferenceCounterUtil;
/**
* @author Clebert Suconic
*/
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager
{
private final SimpleString queueName;
private final ActiveMQServer server;
private final Runnable runnable = new Runnable()
{
public void run()
{
try
{
Queue queue = server.locateQueue(queueName);
long consumerCount = queue.getConsumerCount();
long messageCount = queue.getMessageCount();
if (server.locateQueue(queueName).getMessageCount() == 0)
{
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + "\" because consumerCount = " + consumerCount + " and messageCount = " + messageCount);
}
server.destroyQueue(queueName, null, false);
}
else if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug("NOT deleting auto-created queue \"" + queueName + "\" because consumerCount = " + consumerCount + " and messageCount = " + messageCount);
}
}
catch (Exception e)
{
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
}
}
};
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName)
{
this.server = server;
this.queueName = queueName;
}
@Override
public int increment()
{
return referenceCounterUtil.increment();
}
@Override
public int decrement()
{
return referenceCounterUtil.decrement();
}
@Override
public SimpleString getQueueName()
{
return queueName;
}
}

View File

@ -55,6 +55,7 @@ public class LastValueQueue extends QueueImpl
final PageSubscription pageSubscription,
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@ -68,6 +69,7 @@ public class LastValueQueue extends QueueImpl
pageSubscription,
durable,
temporary,
autoCreated,
scheduledExecutor,
postOffice,
storageManager,

View File

@ -46,6 +46,7 @@ import org.apache.activemq.core.postoffice.Binding;
import org.apache.activemq.core.postoffice.DuplicateIDCache;
import org.apache.activemq.core.postoffice.PostOffice;
import org.apache.activemq.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.NodeManager;
@ -155,7 +156,13 @@ public class PostOfficeJournalLoader implements JournalLoader
filter,
subscription,
true,
false);
false,
queueBindingInfo.isAutoCreated());
if (queueBindingInfo.isAutoCreated())
{
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
}
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());

View File

@ -75,7 +75,8 @@ public class QueueFactoryImpl implements QueueFactory
final Filter filter,
final PageSubscription pageSubscription,
final boolean durable,
final boolean temporary)
final boolean temporary,
final boolean autoCreated)
{
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
@ -89,6 +90,7 @@ public class QueueFactoryImpl implements QueueFactory
pageSubscription,
durable,
temporary,
autoCreated,
scheduledExecutor,
postOffice,
storageManager,
@ -104,6 +106,7 @@ public class QueueFactoryImpl implements QueueFactory
pageSubscription,
durable,
temporary,
autoCreated,
scheduledExecutor,
postOffice,
storageManager,

View File

@ -61,7 +61,6 @@ import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.Consumer;
import org.apache.activemq.core.server.HandleStatus;
import org.apache.activemq.core.server.ActiveMQMessageBundle;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.RoutingContext;
@ -131,6 +130,8 @@ public class QueueImpl implements Queue
private final boolean temporary;
private final boolean autoCreated;
private final PostOffice postOffice;
private volatile boolean queueDestroyed = false;
@ -315,6 +316,7 @@ public class QueueImpl implements Queue
final Filter filter,
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@ -328,6 +330,7 @@ public class QueueImpl implements Queue
null,
durable,
temporary,
autoCreated,
scheduledExecutor,
postOffice,
storageManager,
@ -342,6 +345,7 @@ public class QueueImpl implements Queue
final PageSubscription pageSubscription,
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@ -362,6 +366,8 @@ public class QueueImpl implements Queue
this.temporary = temporary;
this.autoCreated = autoCreated;
this.postOffice = postOffice;
this.storageManager = storageManager;
@ -425,11 +431,11 @@ public class QueueImpl implements Queue
}
// Queue implementation ----------------------------------------------------------------------------------------
public synchronized void setConsumersRefCount(final ActiveMQServer server)
public synchronized void setConsumersRefCount(final ReferenceCounter referenceCounter)
{
if (refCountForConsumers == null)
{
this.refCountForConsumers = new TransientQueueManagerImpl(server, this.name);
this.refCountForConsumers = referenceCounter;
}
}
@ -449,6 +455,11 @@ public class QueueImpl implements Queue
return temporary;
}
public boolean isAutoCreated()
{
return autoCreated;
}
public SimpleString getName()
{
return name;

View File

@ -37,6 +37,7 @@ import org.apache.activemq.api.core.Pair;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.management.CoreNotificationType;
import org.apache.activemq.api.core.management.ManagementHelper;
import org.apache.activemq.api.core.management.ResourceNames;
import org.apache.activemq.core.client.impl.ClientMessageImpl;
import org.apache.activemq.core.exception.ActiveMQXAException;
import org.apache.activemq.core.filter.Filter;
@ -549,7 +550,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener
securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
}
Queue queue = server.createQueue(address, name, filterString, durable, temporary);
// any non-temporary JMS queue created via this method should be marked as auto-created
if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name))
{
server.createQueue(address, name, filterString, durable, temporary, true);
}
else
{
server.createQueue(address, name, filterString, durable, temporary);
}
if (temporary)
{
@ -676,6 +685,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception
{
boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
if (name == null)
{
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
@ -699,16 +710,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener
queue.isTemporary(),
filterString,
queue.getConsumerCount(),
queue.getMessageCount());
queue.getMessageCount(),
autoCreateJmsQueues);
}
// make an exception for the management address (see HORNETQ-29)
else if (name.equals(managementAddress))
{
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1);
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
}
else if (autoCreateJmsQueues)
{
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
}
else
{
response = new QueueQueryResult();
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
}
return response;
@ -716,6 +732,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception
{
boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
if (address == null)
{
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
@ -726,7 +744,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
// make an exception for the management address (see HORNETQ-29)
if (address.equals(managementAddress))
{
return new BindingQueryResult(true, names);
return new BindingQueryResult(true, names, autoCreateJmsQueues);
}
Bindings bindings = postOffice.getMatchingBindings(address);
@ -739,7 +757,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
}
}
return new BindingQueryResult(!names.isEmpty(), names);
return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
}
public void forceConsumerDelivery(final long consumerID, final long sequence) throws Exception

View File

@ -56,6 +56,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;
public static final boolean DEFAULT_AUTO_CREATE_QUEUES = true;
public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true;
public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;
public static final long DEFAULT_EXPIRY_DELAY = -1;
@ -106,6 +110,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private SlowConsumerPolicy slowConsumerPolicy = null;
private Boolean autoCreateJmsQueues = null;
private Boolean autoDeleteJmsQueues = null;
public AddressSettings(AddressSettings other)
{
this.addressFullMessagePolicy = other.addressFullMessagePolicy;
@ -127,6 +135,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.slowConsumerThreshold = other.slowConsumerThreshold;
this.slowConsumerCheckPeriod = other.slowConsumerCheckPeriod;
this.slowConsumerPolicy = other.slowConsumerPolicy;
this.autoCreateJmsQueues = other.autoCreateJmsQueues;
this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
}
public AddressSettings()
@ -134,6 +144,26 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues != null ? autoCreateJmsQueues : AddressSettings.DEFAULT_AUTO_CREATE_QUEUES;
}
public void setAutoCreateJmsQueues(final boolean autoCreateJmsQueues)
{
this.autoCreateJmsQueues = autoCreateJmsQueues;
}
public boolean isAutoDeleteJmsQueues()
{
return autoDeleteJmsQueues != null ? autoDeleteJmsQueues : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES;
}
public void setAutoDeleteJmsQueues(final boolean autoDeleteJmsQueues)
{
this.autoDeleteJmsQueues = autoDeleteJmsQueues;
}
public boolean isLastValueQueue()
{
return lastValueQueue != null ? lastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE;
@ -398,6 +428,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
{
slowConsumerPolicy = merged.slowConsumerPolicy;
}
if (autoCreateJmsQueues == null)
{
autoCreateJmsQueues = merged.autoCreateJmsQueues;
}
if (autoDeleteJmsQueues == null)
{
autoDeleteJmsQueues = merged.autoDeleteJmsQueues;
}
}
@Override
@ -458,6 +496,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
{
slowConsumerPolicy = null;
}
autoCreateJmsQueues = BufferHelper.readNullableBoolean(buffer);
autoDeleteJmsQueues = BufferHelper.readNullableBoolean(buffer);
}
@Override
@ -482,7 +524,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute) +
BufferHelper.sizeOfNullableLong(slowConsumerCheckPeriod) +
BufferHelper.sizeOfNullableLong(slowConsumerThreshold) +
BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null);
BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null) +
BufferHelper.sizeOfNullableBoolean(autoCreateJmsQueues) +
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues);
}
@Override
@ -526,6 +570,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableLong(buffer, slowConsumerCheckPeriod);
buffer.writeNullableSimpleString(slowConsumerPolicy != null ? new SimpleString(slowConsumerPolicy.toString()) : null);
BufferHelper.writeNullableBoolean(buffer, autoCreateJmsQueues);
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsQueues);
}
/* (non-Javadoc)
@ -556,6 +604,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((slowConsumerThreshold == null) ? 0 : slowConsumerThreshold.hashCode());
result = prime * result + ((slowConsumerCheckPeriod == null) ? 0 : slowConsumerCheckPeriod.hashCode());
result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode());
result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
return result;
}
@ -705,6 +755,20 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
else if (!slowConsumerPolicy.equals(other.slowConsumerPolicy))
return false;
if (autoCreateJmsQueues == null)
{
if (other.autoCreateJmsQueues != null)
return false;
}
else if (!autoCreateJmsQueues.equals(other.autoCreateJmsQueues))
return false;
if (autoDeleteJmsQueues == null)
{
if (other.autoDeleteJmsQueues != null)
return false;
}
else if (!autoDeleteJmsQueues.equals(other.autoDeleteJmsQueues))
return false;
return true;
}
@ -751,6 +815,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
slowConsumerCheckPeriod +
", slowConsumerPolicy=" +
slowConsumerPolicy +
", autoCreateJmsQueues=" +
autoCreateJmsQueues +
", autoDeleteJmsQueues=" +
autoDeleteJmsQueues +
"]";
}
}

View File

@ -2258,6 +2258,23 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="auto-create-jms-queues" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether or not to automatically create JMS queues when a producer sends or a consumer connects to a
queue
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="auto-delete-jms-queues" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether or not to delete auto-created JMS queues when the queue has 0 consumers and 0 messages
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required">

View File

@ -297,6 +297,8 @@ public class FileConfigurationTest extends ConfigurationImplTest
assertEquals(10, conf.getAddressesSettings().get("a1").getSlowConsumerThreshold());
assertEquals(5, conf.getAddressesSettings().get("a1").getSlowConsumerCheckPeriod());
assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy());
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues());
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues());
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
@ -308,6 +310,8 @@ public class FileConfigurationTest extends ConfigurationImplTest
assertEquals(20, conf.getAddressesSettings().get("a2").getSlowConsumerThreshold());
assertEquals(15, conf.getAddressesSettings().get("a2").getSlowConsumerCheckPeriod());
assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy());
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues());
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues());
assertEquals(2, conf.getQueueConfigurations().size());

View File

@ -42,7 +42,6 @@ import org.apache.activemq.core.message.BodyEncoder;
import org.apache.activemq.core.paging.PagingStore;
import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.server.Consumer;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.RoutingContext;
@ -1053,6 +1052,12 @@ public class ScheduledDeliveryHandlerTest extends Assert
return false;
}
@Override
public boolean isAutoCreated()
{
return false;
}
@Override
public void addConsumer(Consumer consumer) throws Exception
{
@ -1072,7 +1077,7 @@ public class ScheduledDeliveryHandlerTest extends Assert
}
@Override
public void setConsumersRefCount(ActiveMQServer server)
public void setConsumersRefCount(ReferenceCounter referenceCounter)
{
}

View File

@ -46,7 +46,8 @@ public class AddressSettingsTest extends UnitTestCase
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD, addressSettings.getSlowConsumerThreshold());
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD, addressSettings.getSlowConsumerCheckPeriod());
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_QUEUES, addressSettings.isAutoCreateJmsQueues());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_QUEUES, addressSettings.isAutoDeleteJmsQueues());
}
@Test

View File

@ -271,6 +271,8 @@
<slow-consumer-threshold>10</slow-consumer-threshold>
<slow-consumer-check-period>5</slow-consumer-check-period>
<slow-consumer-policy>NOTIFY</slow-consumer-policy>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-delete-jms-queues>true</auto-delete-jms-queues>
</address-setting>
<address-setting match="a2">
<dead-letter-address>a2.1</dead-letter-address>
@ -283,6 +285,8 @@
<slow-consumer-threshold>20</slow-consumer-threshold>
<slow-consumer-check-period>15</slow-consumer-check-period>
<slow-consumer-policy>KILL</slow-consumer-policy>
<auto-create-jms-queues>false</auto-create-jms-queues>
<auto-delete-jms-queues>false</auto-delete-jms-queues>
</address-setting>
</address-settings>
<connector-services>

View File

@ -95,6 +95,8 @@ entry that would be found in the `activemq-configuration.xml` file.
<slow-consumer-threshold>-1</slow-consumer-threshold>
<slow-consumer-policy>NOTIFY</slow-consumer-policy>
<slow-consumer-check-period>5</slow-consumer-check-period>
<auto-create-queues>true</auto-create-queues>
<auto-delete-queues>true</auto-delete-queues>
</address-setting>
</address-settings>
@ -176,3 +178,13 @@ on this notification.
`slow-consumer-check-period`. How often to check for slow consumers on a
particular queue. Measured in minutes. Default is 5. See ? for more
information about slow consumer detection.
`auto-create-jms-queues`. Whether or not the broker should automatically
create a JMS queue when a JMS message is sent to a queue whose name fits
the address `match` (remember, a JMS queue is just a core queue which has
the same address and queue name) or a JMS consumer tries to connect to a
queue whose name fits the address `match`. Queues which are auto-created
are durable, non-temporary, and non-transient.
`auto-delete-jms-queues`. Whether or not to the broker should automatically
delete auto-created JMS queues when they have both 0 consumers and 0 messages.

View File

@ -49,7 +49,7 @@
<activemq.version.majorVersion>6</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.incrementingVersion>126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionSuffix>SNAPSHOT</activemq.version.versionSuffix>
<activemq.version.versionTag>SNAPSHOT</activemq.version.versionTag>
<ActiveMQ-Version>

View File

@ -77,6 +77,7 @@ public class QueueTest extends UnitTestCase
null,
null,
false,
false,
false);
FakeConsumer consumer = new FakeConsumer();

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.client;
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.core.security.Role;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.tests.util.JMSTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author Justin Bertram
*/
public class AutoCreateJmsQueueTest extends JMSTestBase
{
@Test
public void testAutoCreateOnSendToQueue() throws Exception
{
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
MessageProducer producer = session.createProducer(queue);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++)
{
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
producer.close();
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < numMessages; i++)
{
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
}
connection.close();
}
@Test
public void testAutoCreateOnSendToQueueSecurity() throws Exception
{
server.getSecurityManager().addUser("guest", "guest");
server.getSecurityManager().setDefaultUser("guest");
server.getSecurityManager().addRole("guest", "rejectAll");
Role role = new Role("rejectAll", false, false, false, false, false, false, false);
Set<Role> roles = new HashSet<Role>();
roles.add(role);
server.getSecurityRepository().addMatch("#", roles);
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
try
{
MessageProducer producer = session.createProducer(queue);
Assert.fail("Creating a producer here should throw a JMSSecurityException");
}
catch (Exception e)
{
Assert.assertTrue(e instanceof JMSSecurityException);
}
connection.close();
}
@Test
public void testAutoCreateOnSendToTopic() throws Exception
{
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
try
{
MessageProducer producer = session.createProducer(topic);
Assert.fail("Creating a producer here should throw an exception");
}
catch (Exception e)
{
Assert.assertTrue(e instanceof InvalidDestinationException);
}
connection.close();
}
@Test
public void testAutoCreateOnConsumeFromQueue() throws Exception
{
Connection connection = null;
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
Message m = messageConsumer.receive(500);
Assert.assertNull(m);
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString("jms.queue.test")).getBindable();
Assert.assertEquals(0, q.getMessageCount());
Assert.assertEquals(0, q.getMessagesAdded());
connection.close();
}
@Test
public void testAutoCreateOnConsumeFromTopic() throws Exception
{
Connection connection = null;
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
try
{
MessageConsumer messageConsumer = session.createConsumer(topic);
Assert.fail("Creating a consumer here should throw an exception");
}
catch (Exception e)
{
Assert.assertTrue(e instanceof InvalidDestinationException);
}
connection.close();
}
@Before
@Override
public void setUp() throws Exception
{
super.setUp();
server.getSecurityManager().addUser("guest", "guest");
server.getSecurityManager().setDefaultUser("guest");
server.getSecurityManager().addRole("guest", "allowAll");
Role role = new Role("allowAll", true, true, true, true, true, true, true);
Set<Role> roles = new HashSet<Role>();
roles.add(role);
server.getSecurityRepository().addMatch("#", roles);
}
protected boolean useSecurity()
{
return true;
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.client;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.tests.util.JMSTestBase;
import org.junit.Assert;
import org.junit.Test;
/**
* @author Justin Bertram
*/
public class AutoDeleteJmsQueueTest extends JMSTestBase
{
@Test
public void testAutoDelete() throws Exception
{
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
MessageProducer producer = session.createProducer(queue);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++)
{
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
producer.close();
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < numMessages - 1; i++)
{
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
}
session.close();
// ensure the queue is still there
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString("jms.queue.test")).getBindable();
Assert.assertEquals(1, q.getMessageCount());
Assert.assertEquals(numMessages, q.getMessagesAdded());
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
messageConsumer = session.createConsumer(queue);
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
connection.close();
// ensure the queue was removed
Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("jms.queue.test")));
}
}

View File

@ -242,6 +242,7 @@ public class HangConsumerTest extends ServiceTestBase
final PageSubscription pageSubscription,
final boolean durable,
final boolean temporary,
final boolean autoCreated,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@ -255,6 +256,7 @@ public class HangConsumerTest extends ServiceTestBase
pageSubscription,
durable,
temporary,
autoCreated,
scheduledExecutor,
postOffice,
storageManager,
@ -294,7 +296,8 @@ public class HangConsumerTest extends ServiceTestBase
final Filter filter,
final PageSubscription pageSubscription,
final boolean durable,
final boolean temporary)
final boolean temporary,
final boolean autoCreated)
{
queue = new MyQueueWithBlocking(persistenceID,
address,
@ -303,6 +306,7 @@ public class HangConsumerTest extends ServiceTestBase
pageSubscription,
durable,
temporary,
autoCreated,
scheduledExecutor,
postOffice,
storageManager,
@ -401,7 +405,7 @@ public class HangConsumerTest extends ServiceTestBase
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, true, false, null, null, null, null, null), server.getNodeID());
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, true, false, false, null, null, null, null, null), server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID);

View File

@ -506,6 +506,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
PageSubscription pageSubscription,
boolean durable,
boolean temporary,
boolean autoCreated,
ScheduledExecutorService scheduledExecutor,
PostOffice postOffice,
StorageManager storageManager,
@ -519,6 +520,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
pageSubscription,
durable,
temporary,
autoCreated,
scheduledExecutor,
postOffice,
storageManager,
@ -570,7 +572,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
Filter filter,
PageSubscription pageSubscription,
boolean durable,
boolean temporary)
boolean temporary,
boolean autoCreated)
{
return new NoPostACKQueue(persistenceID,
@ -580,6 +583,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
pageSubscription,
durable,
temporary,
autoCreated,
scheduledExecutor,
postOffice,
storageManager,

View File

@ -749,7 +749,9 @@ public class PagingOrderTest extends ServiceTestBase
"PAGE",
-1,
10,
"KILL");
"KILL",
true,
true);
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
new TransportConfiguration(INVM_CONNECTOR_FACTORY));
@ -824,7 +826,9 @@ public class PagingOrderTest extends ServiceTestBase
"PAGE",
-1,
10,
"KILL");
"KILL",
true,
true);
jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1");

View File

@ -82,7 +82,7 @@ public class TopicCleanupTest extends JMSTestBase
{
long txid = storage.generateID();
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false, server.getScheduledPool(), server.getPostOffice(),
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false, false, server.getScheduledPool(), server.getPostOffice(),
storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());

View File

@ -62,7 +62,7 @@ public class NonExistentQueueTest extends JMSTestBase
@Test
public void sendToNonExistantDestination() throws Exception
{
Destination destination = ActiveMQJMSClient.createQueue("DoesNotExist");
Destination destination = ActiveMQJMSClient.createTopic("DoesNotExist");
TransportConfiguration transportConfiguration = new TransportConfiguration(InVMConnectorFactory.class.getName());
ConnectionFactory localConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
transportConfiguration);

View File

@ -507,6 +507,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase
long slowConsumerThreshold = 5;
long slowConsumerCheckPeriod = 10;
String slowConsumerPolicy = SlowConsumerPolicy.KILL.toString();
boolean autoCreateJmsQueues = false;
boolean autoDeleteJmsQueues = false;
serverControl.addAddressSettings(addressMatch,
DLA,
@ -525,7 +527,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase
addressFullMessagePolicy,
slowConsumerThreshold,
slowConsumerCheckPeriod,
slowConsumerPolicy);
slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
boolean ex = false;
@ -548,7 +552,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase
addressFullMessagePolicy,
slowConsumerThreshold,
slowConsumerCheckPeriod,
slowConsumerPolicy);
slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
}
catch (Exception expected)
{
@ -578,6 +584,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase
assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold());
assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod());
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
serverControl.addAddressSettings(addressMatch,
DLA,
@ -596,7 +604,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase
addressFullMessagePolicy,
slowConsumerThreshold,
slowConsumerCheckPeriod,
slowConsumerPolicy);
slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
@ -618,6 +628,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase
assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold());
assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod());
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
ex = false;
@ -640,7 +652,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase
addressFullMessagePolicy,
slowConsumerThreshold,
slowConsumerCheckPeriod,
slowConsumerPolicy);
slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
}
catch (Exception e)
{

View File

@ -582,7 +582,9 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
@Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy) throws Exception
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception
{
proxy.invokeOperation("addAddressSettings",
addressMatch,
@ -602,7 +604,9 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
addressFullMessagePolicy,
slowConsumerThreshold,
slowConsumerCheckPeriod,
slowConsumerPolicy);
slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
}
public void removeAddressSettings(String addressMatch) throws Exception

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.tests.integration.persistence;
import org.junit.Ignore;
import org.junit.Test;
import java.io.StringReader;
@ -47,37 +48,40 @@ public class ExportFormatTest extends ServiceTestBase
// Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field
String bindingsFile = "#File,JournalFileImpl: (activemq-bindings-1.bindings id = 1, recordID = 1)\n" +
"operation@AddRecord,id@2,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAH____8=\n" +
"operation@AddRecord,id@2,userRecordType@21,length@17,isUpdate@false,compactCount@0,data@AAAABEEAMQAAAAAEQQAxAAA=\n" +
"operation@AddRecord,id@20,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAAAAABQ=\n" +
"#File,JournalFileImpl: (activemq-bindings-2.bindings id = 2, recordID = 2)";
"operation@AddRecord,id@2,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAH____8=\n" +
"operation@AddRecordTX,txID@2,id@3,userRecordType@21,length@18,isUpdate@false,compactCount@0,data@AAAABEEAMQAAAAAEQQAxAAAA\n" +
"operation@Commit,txID@2,numberOfRecords@1\n" +
"operation@AddRecord,id@20,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAAAAABQ=\n" +
"#File,JournalFileImpl: (activemq-bindings-2.bindings id = 2, recordID = 2)";
// Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field
String journalFile = "#File,JournalFileImpl: (activemq-data-1.amq id = 1, recordID = 1)\n" +
"operation@AddRecordTX,txID@0,id@4,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP40EAQAAAAEAAAAGawBlAHkABgAAAAA=\n" +
"operation@UpdateTX,txID@0,id@4,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"operation@AddRecordTX,txID@0,id@5,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAE=\n" +
"operation@UpdateTX,txID@0,id@5,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"operation@AddRecordTX,txID@0,id@6,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAI=\n" +
"operation@UpdateTX,txID@0,id@6,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"operation@AddRecordTX,txID@0,id@7,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAM=\n" +
"operation@UpdateTX,txID@0,id@7,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"operation@AddRecordTX,txID@0,id@8,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAACAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAQ=\n" +
"operation@UpdateTX,txID@0,id@8,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"operation@Commit,txID@0,numberOfRecords@10\n" +
"operation@AddRecord,id@12,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP6gEAQAAAAEAAAAGawBlAHkABgAAAAU=\n" +
"operation@Update,id@12,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"operation@AddRecord,id@13,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP6oEAQAAAAEAAAAGawBlAHkABgAAAAY=\n" +
"operation@Update,id@13,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"operation@AddRecord,id@14,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP6sEAQAAAAEAAAAGawBlAHkABgAAAAc=\n" +
"operation@Update,id@14,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"operation@AddRecord,id@15,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP60EAQAAAAEAAAAGawBlAHkABgAAAAg=\n" +
"operation@Update,id@15,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"operation@AddRecord,id@16,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAAEAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP64EAQAAAAEAAAAGawBlAHkABgAAAAk=\n" +
"operation@Update,id@16,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" +
"#File,JournalFileImpl: (activemq-data-2.amq id = 2, recordID = 2)";
"operation@AddRecordTX,txID@0,id@5,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfYEAQAAAAEAAAAGawBlAHkABgAAAAA=\n" +
"operation@UpdateTX,txID@0,id@5,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecordTX,txID@0,id@6,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAE=\n" +
"operation@UpdateTX,txID@0,id@6,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecordTX,txID@0,id@7,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAI=\n" +
"operation@UpdateTX,txID@0,id@7,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecordTX,txID@0,id@8,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAACAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAM=\n" +
"operation@UpdateTX,txID@0,id@8,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecordTX,txID@0,id@9,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAACQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAQ=\n" +
"operation@UpdateTX,txID@0,id@9,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@Commit,txID@0,numberOfRecords@10\n" +
"operation@AddRecord,id@13,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2Cg0EAQAAAAEAAAAGawBlAHkABgAAAAU=\n" +
"operation@Update,id@13,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecord,id@14,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2Cg8EAQAAAAEAAAAGawBlAHkABgAAAAY=\n" +
"operation@Update,id@14,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecord,id@15,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2ChMEAQAAAAEAAAAGawBlAHkABgAAAAc=\n" +
"operation@Update,id@15,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecord,id@16,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAAEAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2ChcEAQAAAAEAAAAGawBlAHkABgAAAAg=\n" +
"operation@Update,id@16,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecord,id@17,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAAEQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2ChoEAQAAAAEAAAAGawBlAHkABgAAAAk=\n" +
"operation@Update,id@17,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"#File,JournalFileImpl: (activemq-data-2.amq id = 2, recordID = 2)";
public void _testCreateFormat() throws Exception
@Test
@Ignore // use this to recreate the format above. Notice we can't change the record format between releases
public void testCreateFormat() throws Exception
{
ActiveMQServer server = createServer(true);
server.start();
@ -86,7 +90,7 @@ public class ExportFormatTest extends ServiceTestBase
ClientSessionFactory factory = createSessionFactory(locator);
ClientSession session = factory.createSession(false, false, false);
session.createQueue("A1", "A1");
session.createQueue("A1", "A1", true);
ClientProducer producer = session.createProducer("A1");
for (int i = 0; i < 5; i++)

View File

@ -34,6 +34,7 @@ import javax.jms.XAConnection;
import javax.jms.XASession;
import org.apache.activemq.api.jms.JMSFactoryType;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
import org.junit.Test;
@ -113,6 +114,10 @@ public class SessionTest extends ActiveMQServerTestCase
@Test
public void testCreateNonExistentQueue() throws Exception
{
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateJmsQueues(false);
getJmsServer().getAddressSettingsRepository().addMatch("#", addressSettings);
Connection conn = getConnectionFactory().createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
@ -147,6 +152,10 @@ public class SessionTest extends ActiveMQServerTestCase
@Test
public void testCreateQueueWhileTopicWithSameNameExists() throws Exception
{
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateJmsQueues(false);
getJmsServer().getAddressSettingsRepository().addMatch("#", addressSettings);
Connection conn = getConnectionFactory().createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
try

View File

@ -81,6 +81,7 @@ public class QueueImplTest extends UnitTestCase
null,
false,
true,
false,
scheduledExecutor,
null,
null,
@ -158,6 +159,7 @@ public class QueueImplTest extends UnitTestCase
null,
false,
true,
false,
scheduledExecutor,
null,
null,
@ -273,6 +275,7 @@ public class QueueImplTest extends UnitTestCase
null,
false,
true,
false,
scheduledExecutor,
null,
null,

View File

@ -25,7 +25,6 @@ import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.filter.Filter;
import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.server.Consumer;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.RoutingContext;
@ -60,7 +59,7 @@ public class FakeQueue implements Queue
}
@Override
public void setConsumersRefCount(ActiveMQServer server)
public void setConsumersRefCount(ReferenceCounter referenceCounter)
{
}
@ -427,6 +426,12 @@ public class FakeQueue implements Queue
return false;
}
@Override
public boolean isAutoCreated()
{
return false;
}
@Override
public LinkedListIterator<MessageReference> iterator()
{

View File

@ -94,17 +94,7 @@ public class QueueImplTest extends UnitTestCase
{
final SimpleString name = new SimpleString("oobblle");
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
name,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getNamedQueue(name);
Assert.assertEquals(name, queue.getName());
}
@ -112,31 +102,11 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testDurable()
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
false,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getNonDurableQueue();
Assert.assertFalse(queue.isDurable());
queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
true,
false,
scheduledExecutor,
null,
null,
null,
executor);
queue = getDurableQueue();
Assert.assertTrue(queue.isDurable());
}
@ -150,17 +120,7 @@ public class QueueImplTest extends UnitTestCase
Consumer cons3 = new FakeConsumer();
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
Assert.assertEquals(0, queue.getConsumerCount());
@ -202,17 +162,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testGetFilter()
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
Assert.assertNull(queue.getFilter());
@ -229,17 +179,7 @@ public class QueueImplTest extends UnitTestCase
}
};
queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
filter,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
queue = getFilteredQueue(filter);
Assert.assertEquals(filter, queue.getFilter());
@ -248,17 +188,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testSimpleadd()
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
final int numMessages = 10;
@ -278,17 +208,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testRate() throws InterruptedException
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
final int numMessages = 10;
@ -309,17 +229,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testSimpleNonDirectDelivery() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
final int numMessages = 10;
@ -358,17 +268,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testBusyConsumer() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
FakeConsumer consumer = new FakeConsumer();
@ -413,17 +313,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testBusyConsumerThenAddMoreMessages() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
FakeConsumer consumer = new FakeConsumer();
@ -491,17 +381,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testaddHeadadd() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
final int numMessages = 10;
@ -556,17 +436,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testChangeConsumersAndDeliver() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
new FakePostOffice(),
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
final int numMessages = 10;
@ -730,17 +600,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testRoundRobinWithQueueing() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
final int numMessages = 10;
@ -793,17 +653,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testWithPriorities() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
final int numMessages = 10;
@ -856,17 +706,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testConsumerWithFilterAddAndRemove()
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
Filter filter = new FakeFilter("fruit", "orange");
@ -876,17 +716,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testIterator()
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
final int numMessages = 20;
@ -925,17 +755,7 @@ public class QueueImplTest extends UnitTestCase
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
new FakePostOffice(),
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
Filter filter = new FakeFilter("fruit", "orange");
@ -1009,17 +829,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
@ -1061,17 +871,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
@ -1146,17 +946,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testConsumerWithFilterThenAddMoreMessages() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@ -1220,17 +1010,7 @@ public class QueueImplTest extends UnitTestCase
private void testConsumerWithFilters(final boolean direct) throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
new FakePostOffice(),
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
Filter filter = new FakeFilter("fruit", "orange");
@ -1323,17 +1103,7 @@ public class QueueImplTest extends UnitTestCase
public void testMessageOrder() throws Exception
{
FakeConsumer consumer = new FakeConsumer();
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@ -1354,17 +1124,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testMessagesAdded() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@ -1377,17 +1137,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testGetReference() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@ -1401,17 +1151,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testGetNonExistentReference() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@ -1430,17 +1170,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testPauseAndResumeWithAsync() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
// pauses the queue
queue.pause();
@ -1498,17 +1228,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testPauseAndResumeWithDirect() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
// Now add a consumer
FakeConsumer consumer = new FakeConsumer();
@ -1553,17 +1273,7 @@ public class QueueImplTest extends UnitTestCase
@Test
public void testResetMessagesAdded() throws Exception
{
QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
QueueImpl queue = getTemporaryQueue();
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
queue.addTail(messageReference);
@ -1668,4 +1378,45 @@ public class QueueImplTest extends UnitTestCase
server.stop();
}
}
private QueueImpl getNonDurableQueue()
{
return getQueue(QueueImplTest.queue1, false, false, null);
}
private QueueImpl getDurableQueue()
{
return getQueue(QueueImplTest.queue1, true, false, null);
}
private QueueImpl getNamedQueue(SimpleString name)
{
return getQueue(name, false, true, null);
}
private QueueImpl getFilteredQueue(Filter filter)
{
return getQueue(QueueImplTest.queue1, false, true, filter);
}
private QueueImpl getTemporaryQueue()
{
return getQueue(QueueImplTest.queue1, false, true, null);
}
private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter)
{
return new QueueImpl(1,
QueueImplTest.address1,
name,
filter,
durable,
temporary,
false,
scheduledExecutor,
new FakePostOffice(),
null,
null,
executor);
}
}

View File

@ -49,7 +49,8 @@ public class FakeQueueFactory implements QueueFactory
final Filter filter,
final PageSubscription subscription,
final boolean durable,
final boolean temporary)
final boolean temporary,
final boolean autoCreated)
{
return new QueueImpl(persistenceID,
address,
@ -58,6 +59,7 @@ public class FakeQueueFactory implements QueueFactory
subscription,
durable,
temporary,
autoCreated,
scheduledExecutor,
postOffice,
null,