This closes #60 on auto-create queues

This commit is contained in:
Clebert Suconic 2015-01-13 16:40:29 -05:00
commit c98937c089
59 changed files with 1228 additions and 437 deletions

View File

@ -48,6 +48,12 @@ public interface ClientSession extends XAResource, AutoCloseable
* Returns the names of the queues bound to the binding. * Returns the names of the queues bound to the binding.
*/ */
List<SimpleString> getQueueNames(); List<SimpleString> getQueueNames();
/**
* Returns <code>true</code> if auto-creation for this address is enabled and if the address queried is for a JMS
* queue, <code>false</code> else.
*/
boolean isAutoCreateJmsQueues();
} }
/** /**
@ -81,6 +87,12 @@ public interface ClientSession extends XAResource, AutoCloseable
*/ */
boolean isDurable(); boolean isDurable();
/**
* Returns <code>true</code> if auto-creation for this queue is enabled and if the queue queried is a JMS queue,
* <code>false</code> else.
*/
boolean isAutoCreateJmsQueues();
/** /**
* Returns the number of consumers attached to the queue. * Returns the number of consumers attached to the queue.
*/ */

View File

@ -565,7 +565,9 @@ public interface ActiveMQServerControl
@Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy, @Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
@Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold, @Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod, @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy) throws Exception; @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception;
void removeAddressSettings(String addressMatch) throws Exception; void removeAddressSettings(String addressMatch) throws Exception;

View File

@ -60,6 +60,10 @@ public final class AddressSettingsInfo
private final String slowConsumerPolicy; private final String slowConsumerPolicy;
private final boolean autoCreateJmsQueues;
private final boolean autoDeleteJmsQueues;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
public static AddressSettingsInfo from(final String jsonString) throws Exception public static AddressSettingsInfo from(final String jsonString) throws Exception
@ -80,7 +84,9 @@ public final class AddressSettingsInfo
object.getBoolean("sendToDLAOnNoRoute"), object.getBoolean("sendToDLAOnNoRoute"),
object.getLong("slowConsumerThreshold"), object.getLong("slowConsumerThreshold"),
object.getLong("slowConsumerCheckPeriod"), object.getLong("slowConsumerCheckPeriod"),
object.getString("slowConsumerPolicy")); object.getString("slowConsumerPolicy"),
object.getBoolean("autoCreateJmsQueues"),
object.getBoolean("autoDeleteJmsQueues"));
} }
// Constructors -------------------------------------------------- // Constructors --------------------------------------------------
@ -100,7 +106,9 @@ public final class AddressSettingsInfo
boolean sendToDLAOnNoRoute, boolean sendToDLAOnNoRoute,
long slowConsumerThreshold, long slowConsumerThreshold,
long slowConsumerCheckPeriod, long slowConsumerCheckPeriod,
String slowConsumerPolicy) String slowConsumerPolicy,
boolean autoCreateJmsQueues,
boolean autoDeleteJmsQueues)
{ {
this.addressFullMessagePolicy = addressFullMessagePolicy; this.addressFullMessagePolicy = addressFullMessagePolicy;
this.maxSizeBytes = maxSizeBytes; this.maxSizeBytes = maxSizeBytes;
@ -118,6 +126,8 @@ public final class AddressSettingsInfo
this.slowConsumerThreshold = slowConsumerThreshold; this.slowConsumerThreshold = slowConsumerThreshold;
this.slowConsumerCheckPeriod = slowConsumerCheckPeriod; this.slowConsumerCheckPeriod = slowConsumerCheckPeriod;
this.slowConsumerPolicy = slowConsumerPolicy; this.slowConsumerPolicy = slowConsumerPolicy;
this.autoCreateJmsQueues = autoCreateJmsQueues;
this.autoDeleteJmsQueues = autoDeleteJmsQueues;
} }
// Public -------------------------------------------------------- // Public --------------------------------------------------------
@ -206,5 +216,15 @@ public final class AddressSettingsInfo
{ {
return slowConsumerPolicy; return slowConsumerPolicy;
} }
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
public boolean isAutoDeleteJmsQueues()
{
return autoDeleteJmsQueues;
}
} }

View File

@ -24,15 +24,17 @@ import org.apache.activemq.api.core.client.ClientSession;
public class AddressQueryImpl implements ClientSession.AddressQuery, ClientSession.BindingQuery public class AddressQueryImpl implements ClientSession.AddressQuery, ClientSession.BindingQuery
{ {
private final boolean exists; private final boolean exists;
private final ArrayList<SimpleString> queueNames; private final ArrayList<SimpleString> queueNames;
public AddressQueryImpl(final boolean exists, final List<SimpleString> queueNames) private final boolean autoCreateJmsQueues;
public AddressQueryImpl(final boolean exists, final List<SimpleString> queueNames, final boolean autoCreateJmsQueues)
{ {
this.exists = exists; this.exists = exists;
this.queueNames = new ArrayList<SimpleString>(queueNames); this.queueNames = new ArrayList<SimpleString>(queueNames);
this.autoCreateJmsQueues = autoCreateJmsQueues;
} }
public List<SimpleString> getQueueNames() public List<SimpleString> getQueueNames()
@ -44,4 +46,9 @@ public class AddressQueryImpl implements ClientSession.AddressQuery, ClientSessi
{ {
return exists; return exists;
} }
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
} }

View File

@ -38,6 +38,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
private final SimpleString name; private final SimpleString name;
private final boolean autoCreateJmsQueues;
public QueueQueryImpl(final boolean durable, public QueueQueryImpl(final boolean durable,
final boolean temporary, final boolean temporary,
final int consumerCount, final int consumerCount,
@ -47,7 +49,19 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
final SimpleString name, final SimpleString name,
final boolean exists) final boolean exists)
{ {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, false);
}
public QueueQueryImpl(final boolean durable,
final boolean temporary,
final int consumerCount,
final long messageCount,
final SimpleString filterString,
final SimpleString address,
final SimpleString name,
final boolean exists,
final boolean autoCreateJmsQueues)
{
this.durable = durable; this.durable = durable;
this.temporary = temporary; this.temporary = temporary;
this.consumerCount = consumerCount; this.consumerCount = consumerCount;
@ -56,6 +70,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
this.address = address; this.address = address;
this.name = name; this.name = name;
this.exists = exists; this.exists = exists;
this.autoCreateJmsQueues = autoCreateJmsQueues;
} }
public SimpleString getName() public SimpleString getName()
@ -88,6 +103,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
return durable; return durable;
} }
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
public boolean isTemporary() public boolean isTemporary()
{ {
return temporary; return temporary;

View File

@ -60,7 +60,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
@ -72,7 +72,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionIndividualA
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
@ -231,13 +231,11 @@ public class ActiveMQSessionContext extends SessionContext
public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException
{ {
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); SessionQueueQueryResponseMessage_V2 response = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
return response.toQueueQuery(); return response.toQueueQuery();
} }
public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString,
int windowSize, int maxRate, int ackBatchSize, boolean browseOnly, int windowSize, int maxRate, int ackBatchSize, boolean browseOnly,
Executor executor, Executor flowControlExecutor) throws ActiveMQException Executor executor, Executor flowControlExecutor) throws ActiveMQException
@ -252,7 +250,7 @@ public class ActiveMQSessionContext extends SessionContext
browseOnly, browseOnly,
true); true);
SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); SessionQueueQueryResponseMessage_V2 queueInfo = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
// The actual windows size that gets used is determined by the user since // The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings // could be overridden on the queue settings
@ -283,10 +281,10 @@ public class ActiveMQSessionContext extends SessionContext
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException
{ {
SessionBindingQueryResponseMessage response = SessionBindingQueryResponseMessage_V2 response =
(SessionBindingQueryResponseMessage) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
return new AddressQueryImpl(response.isExists(), response.getQueueNames()); return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues());
} }

View File

@ -161,6 +161,10 @@ public final class ChannelImpl implements Channel
return version >= 125; return version >= 125;
case PacketImpl.DISCONNECT_V2: case PacketImpl.DISCONNECT_V2:
return version >= 125; return version >= 125;
case PacketImpl.SESS_QUEUEQUERY_RESP_V2:
return version >= 126;
case PacketImpl.SESS_BINDINGQUERY_RESP_V2:
return version >= 126;
default: default:
return true; return true;
} }

View File

@ -39,6 +39,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ADD_ME
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_CLOSE; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_COMMIT; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
@ -52,6 +53,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_PRODUC
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP_V2;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION; import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
@ -107,6 +109,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaData
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCommitMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCommitMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
@ -120,6 +123,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCre
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
@ -251,6 +255,11 @@ public abstract class PacketDecoder implements Serializable
packet = new SessionQueueQueryResponseMessage(); packet = new SessionQueueQueryResponseMessage();
break; break;
} }
case SESS_QUEUEQUERY_RESP_V2:
{
packet = new SessionQueueQueryResponseMessage_V2();
break;
}
case CREATE_QUEUE: case CREATE_QUEUE:
{ {
packet = new CreateQueueMessage(); packet = new CreateQueueMessage();
@ -276,6 +285,11 @@ public abstract class PacketDecoder implements Serializable
packet = new SessionBindingQueryResponseMessage(); packet = new SessionBindingQueryResponseMessage();
break; break;
} }
case SESS_BINDINGQUERY_RESP_V2:
{
packet = new SessionBindingQueryResponseMessage_V2();
break;
}
case SESS_XA_START: case SESS_XA_START:
{ {
packet = new SessionXAStartMessage(); packet = new SessionXAStartMessage();

View File

@ -246,6 +246,10 @@ public class PacketImpl implements Packet
public static final byte SCALEDOWN_ANNOUNCEMENT = -6; public static final byte SCALEDOWN_ANNOUNCEMENT = -6;
public static final byte SESS_QUEUEQUERY_RESP_V2 = -7;
public static final byte SESS_BINDINGQUERY_RESP_V2 = -8;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
public PacketImpl(final byte type) public PacketImpl(final byte type)

View File

@ -32,9 +32,9 @@ import org.apache.activemq.core.protocol.core.impl.PacketImpl;
*/ */
public class SessionBindingQueryResponseMessage extends PacketImpl public class SessionBindingQueryResponseMessage extends PacketImpl
{ {
private boolean exists; protected boolean exists;
private List<SimpleString> queueNames; protected List<SimpleString> queueNames;
public SessionBindingQueryResponseMessage(final boolean exists, final List<SimpleString> queueNames) public SessionBindingQueryResponseMessage(final boolean exists, final List<SimpleString> queueNames)
{ {
@ -50,6 +50,11 @@ public class SessionBindingQueryResponseMessage extends PacketImpl
super(SESS_BINDINGQUERY_RESP); super(SESS_BINDINGQUERY_RESP);
} }
public SessionBindingQueryResponseMessage(byte v2)
{
super(v2);
}
@Override @Override
public boolean isResponse() public boolean isResponse()
{ {

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.protocol.core.impl.wireformat;
import java.util.List;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.SimpleString;
/**
* @author Justin Bertram
*
*/
public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryResponseMessage
{
private boolean autoCreateJmsQueues;
public SessionBindingQueryResponseMessage_V2(final boolean exists, final List<SimpleString> queueNames, final boolean autoCreateJmsQueues)
{
super(SESS_BINDINGQUERY_RESP_V2);
this.exists = exists;
this.queueNames = queueNames;
this.autoCreateJmsQueues = autoCreateJmsQueues;
}
public SessionBindingQueryResponseMessage_V2()
{
super(SESS_BINDINGQUERY_RESP_V2);
}
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer)
{
super.encodeRest(buffer);
buffer.writeBoolean(autoCreateJmsQueues);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer)
{
super.decodeRest(buffer);
autoCreateJmsQueues = buffer.readBoolean();
}
@Override
public int hashCode()
{
final int prime = 31;
int result = super.hashCode();
result = prime * result + (autoCreateJmsQueues ? 1231 : 1237);
return result;
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionBindingQueryResponseMessage_V2))
return false;
SessionBindingQueryResponseMessage_V2 other = (SessionBindingQueryResponseMessage_V2)obj;
if (autoCreateJmsQueues != other.autoCreateJmsQueues)
return false;
return true;
}
}

View File

@ -32,21 +32,21 @@ import org.apache.activemq.core.server.QueueQueryResult;
*/ */
public class SessionQueueQueryResponseMessage extends PacketImpl public class SessionQueueQueryResponseMessage extends PacketImpl
{ {
private SimpleString name; protected SimpleString name;
private boolean exists; protected boolean exists;
private boolean durable; protected boolean durable;
private int consumerCount; protected int consumerCount;
private long messageCount; protected long messageCount;
private SimpleString filterString; protected SimpleString filterString;
private SimpleString address; protected SimpleString address;
private boolean temporary; protected boolean temporary;
public SessionQueueQueryResponseMessage(final QueueQueryResult result) public SessionQueueQueryResponseMessage(final QueueQueryResult result)
{ {
@ -59,6 +59,11 @@ public class SessionQueueQueryResponseMessage extends PacketImpl
this(null, null, false, false, null, 0, 0, false); this(null, null, false, false, null, 0, 0, false);
} }
public SessionQueueQueryResponseMessage(byte v2)
{
super(v2);
}
private SessionQueueQueryResponseMessage(final SimpleString name, private SessionQueueQueryResponseMessage(final SimpleString name,
final SimpleString address, final SimpleString address,
final boolean durable, final boolean durable,

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.protocol.core.impl.wireformat;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.core.client.impl.QueueQueryImpl;
import org.apache.activemq.core.server.QueueQueryResult;
/**
* @author Justin Bertram
*
*/
public class SessionQueueQueryResponseMessage_V2 extends SessionQueueQueryResponseMessage
{
private boolean autoCreationEnabled;
public SessionQueueQueryResponseMessage_V2(final QueueQueryResult result)
{
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(),
result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateJmsQueues());
}
public SessionQueueQueryResponseMessage_V2()
{
this(null, null, false, false, null, 0, 0, false, false);
}
private SessionQueueQueryResponseMessage_V2(final SimpleString name,
final SimpleString address,
final boolean durable,
final boolean temporary,
final SimpleString filterString,
final int consumerCount,
final long messageCount,
final boolean exists,
final boolean autoCreationEnabled)
{
super(SESS_QUEUEQUERY_RESP_V2);
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
this.messageCount = messageCount;
this.filterString = filterString;
this.address = address;
this.name = name;
this.exists = exists;
this.autoCreationEnabled = autoCreationEnabled;
}
public boolean isAutoCreationEnabled()
{
return autoCreationEnabled;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer)
{
super.encodeRest(buffer);
buffer.writeBoolean(autoCreationEnabled);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer)
{
super.decodeRest(buffer);
autoCreationEnabled = buffer.readBoolean();
}
@Override
public int hashCode()
{
final int prime = 31;
int result = super.hashCode();
result = prime * result + (autoCreationEnabled ? 1231 : 1237);
return result;
}
public ClientSession.QueueQuery toQueueQuery()
{
return new QueueQueryImpl(isDurable(),
isTemporary(),
getConsumerCount(),
getMessageCount(),
getFilterString(),
getAddress(),
getName(),
isExists(),
isAutoCreationEnabled());
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionQueueQueryResponseMessage_V2))
return false;
SessionQueueQueryResponseMessage_V2 other = (SessionQueueQueryResponseMessage_V2)obj;
if (autoCreationEnabled != other.autoCreationEnabled)
return false;
return true;
}
}

View File

@ -43,29 +43,28 @@ public class QueueQueryResult
private boolean temporary; private boolean temporary;
private boolean autoCreateJmsQueues;
public QueueQueryResult(final SimpleString name, public QueueQueryResult(final SimpleString name,
final SimpleString address, final SimpleString address,
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final SimpleString filterString, final SimpleString filterString,
final int consumerCount, final int consumerCount,
final long messageCount) final long messageCount,
final boolean autoCreateJmsQueues)
{ {
this(name, address, durable, temporary, filterString, consumerCount, messageCount, true); this(name, address, durable, temporary, filterString, consumerCount, messageCount, autoCreateJmsQueues, true);
} }
public QueueQueryResult() public QueueQueryResult(final SimpleString name,
{
this(null, null, false, false, null, 0, 0, false);
}
private QueueQueryResult(final SimpleString name,
final SimpleString address, final SimpleString address,
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final SimpleString filterString, final SimpleString filterString,
final int consumerCount, final int consumerCount,
final long messageCount, final long messageCount,
final boolean autoCreateJmsQueues,
final boolean exists) final boolean exists)
{ {
this.durable = durable; this.durable = durable;
@ -82,6 +81,8 @@ public class QueueQueryResult
this.name = name; this.name = name;
this.autoCreateJmsQueues = autoCreateJmsQueues;
this.exists = exists; this.exists = exists;
} }
@ -125,4 +126,9 @@ public class QueueQueryResult
return temporary; return temporary;
} }
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
} }

View File

@ -21,4 +21,4 @@ activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionSuffix=${activemq.version.versionSuffix} activemq.version.versionSuffix=${activemq.version.versionSuffix}
activemq.version.versionTag=${activemq.version.versionTag} activemq.version.versionTag=${activemq.version.versionTag}
activemq.version.compatibleVersionList=121,122,123,124,125 activemq.version.compatibleVersionList=121,122,123,124,125,126

View File

@ -415,7 +415,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
ClientSession.AddressQuery query = clientSession.addressQuery(address); ClientSession.AddressQuery query = clientSession.addressQuery(address);
if (!query.isExists()) if (!query.isExists())
{ {
throw new InvalidDestinationException("Destination " + address + " does not exist"); if (query.isAutoCreateJmsQueues())
{
clientSession.createQueue(address, address, true);
}
else
{
throw new InvalidDestinationException("Destination " + address + " does not exist");
}
} }
else else
{ {

View File

@ -326,7 +326,14 @@ public class ActiveMQSession implements QueueSession, TopicSession
if (!response.isExists()) if (!response.isExists())
{ {
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); if (response.isAutoCreateJmsQueues())
{
session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), true);
}
else
{
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
}
} }
connection.addKnownDestination(jbd.getSimpleAddress()); connection.addKnownDestination(jbd.getSimpleAddress());
@ -730,7 +737,14 @@ public class ActiveMQSession implements QueueSession, TopicSession
if (!response.isExists()) if (!response.isExists())
{ {
throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist"); if (response.isAutoCreateJmsQueues())
{
session.createQueue(dest.getSimpleAddress(), dest.getSimpleAddress(), true);
}
else
{
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
}
} }
connection.addKnownDestination(dest.getSimpleAddress()); connection.addKnownDestination(dest.getSimpleAddress());
@ -902,10 +916,17 @@ public class ActiveMQSession implements QueueSession, TopicSession
try try
{ {
AddressQuery message = session.addressQuery(new SimpleString(jbq.getAddress())); AddressQuery response = session.addressQuery(new SimpleString(jbq.getAddress()));
if (!message.isExists()) if (!response.isExists())
{ {
throw new InvalidDestinationException(jbq.getAddress() + " does not exist"); if (response.isAutoCreateJmsQueues())
{
session.createQueue(jbq.getSimpleAddress(), jbq.getSimpleAddress(), true);
}
else
{
throw new InvalidDestinationException("Destination " + jbq.getName() + " does not exist");
}
} }
} }
catch (ActiveMQException e) catch (ActiveMQException e)
@ -1239,13 +1260,13 @@ public class ActiveMQSession implements QueueSession, TopicSession
QueueQuery response = session.queueQuery(queue.getSimpleAddress()); QueueQuery response = session.queueQuery(queue.getSimpleAddress());
if (response.isExists()) if (!response.isExists() && !response.isAutoCreateJmsQueues())
{ {
return queue; return null;
} }
else else
{ {
return null; return queue;
} }
} }

View File

@ -152,6 +152,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
private static final String SLOW_CONSUMER_POLICY_NODE_NAME = "slow-consumer-policy"; private static final String SLOW_CONSUMER_POLICY_NODE_NAME = "slow-consumer-policy";
private static final String AUTO_CREATE_JMS_QUEUES = "auto-create-jms-queues";
private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues";
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
private boolean validateAIO = false; private boolean validateAIO = false;
@ -1139,6 +1143,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
SlowConsumerPolicy policy = Enum.valueOf(SlowConsumerPolicy.class, value); SlowConsumerPolicy policy = Enum.valueOf(SlowConsumerPolicy.class, value);
addressSettings.setSlowConsumerPolicy(policy); addressSettings.setSlowConsumerPolicy(policy);
} }
else if (AUTO_CREATE_JMS_QUEUES.equalsIgnoreCase(name))
{
addressSettings.setAutoCreateJmsQueues(XMLUtil.parseBoolean(child));
}
else if (AUTO_DELETE_JMS_QUEUES.equalsIgnoreCase(name))
{
addressSettings.setAutoDeleteJmsQueues(XMLUtil.parseBoolean(child));
}
} }
return setting; return setting;
} }

View File

@ -1635,6 +1635,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
policy = addressSettings.getSlowConsumerPolicy() == SlowConsumerPolicy.NOTIFY ? "NOTIFY" policy = addressSettings.getSlowConsumerPolicy() == SlowConsumerPolicy.NOTIFY ? "NOTIFY"
: "KILL"; : "KILL";
settings.put("slowConsumerPolicy", policy); settings.put("slowConsumerPolicy", policy);
settings.put("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues());
settings.put("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues());
JSONObject jsonObject = new JSONObject(settings); JSONObject jsonObject = new JSONObject(settings);
return jsonObject.toString(); return jsonObject.toString();
@ -1658,7 +1660,9 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final String addressFullMessagePolicy, final String addressFullMessagePolicy,
final long slowConsumerThreshold, final long slowConsumerThreshold,
final long slowConsumerCheckPeriod, final long slowConsumerCheckPeriod,
final String slowConsumerPolicy) throws Exception final String slowConsumerPolicy,
final boolean autoCreateJmsQueues,
final boolean autoDeleteJmsQueues) throws Exception
{ {
checkStarted(); checkStarted();
@ -1721,6 +1725,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
{ {
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
} }
addressSettings.setAutoCreateJmsQueues(autoCreateJmsQueues);
addressSettings.setAutoDeleteJmsQueues(autoDeleteJmsQueues);
server.getAddressSettingsRepository().addMatch(address, addressSettings); server.getAddressSettingsRepository().addMatch(address, addressSettings);
storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings)); storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings));

View File

@ -43,4 +43,6 @@ public interface QueueBindingInfo
SimpleString getFilterString(); SimpleString getFilterString();
boolean isAutoCreated();
} }

View File

@ -2014,7 +2014,8 @@ public class JournalStorageManager implements StorageManager
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
binding.getAddress(), binding.getAddress(),
filterString); filterString,
queue.isAutoCreated());
readLock(); readLock();
try try
@ -3027,6 +3028,8 @@ public class JournalStorageManager implements StorageManager
public SimpleString filterString; public SimpleString filterString;
public boolean autoCreated;
public PersistentQueueBindingEncoding() public PersistentQueueBindingEncoding()
{ {
} }
@ -3041,16 +3044,20 @@ public class JournalStorageManager implements StorageManager
address + address +
", filterString=" + ", filterString=" +
filterString + filterString +
", autoCreated=" +
autoCreated +
"]"; "]";
} }
public PersistentQueueBindingEncoding(final SimpleString name, public PersistentQueueBindingEncoding(final SimpleString name,
final SimpleString address, final SimpleString address,
final SimpleString filterString) final SimpleString filterString,
final boolean autoCreated)
{ {
this.name = name; this.name = name;
this.address = address; this.address = address;
this.filterString = filterString; this.filterString = filterString;
this.autoCreated = autoCreated;
} }
public long getId() public long getId()
@ -3083,11 +3090,17 @@ public class JournalStorageManager implements StorageManager
return name; return name;
} }
public boolean isAutoCreated()
{
return autoCreated;
}
public void decode(final ActiveMQBuffer buffer) public void decode(final ActiveMQBuffer buffer)
{ {
name = buffer.readSimpleString(); name = buffer.readSimpleString();
address = buffer.readSimpleString(); address = buffer.readSimpleString();
filterString = buffer.readNullableSimpleString(); filterString = buffer.readNullableSimpleString();
autoCreated = buffer.readBoolean();
} }
public void encode(final ActiveMQBuffer buffer) public void encode(final ActiveMQBuffer buffer)
@ -3095,12 +3108,13 @@ public class JournalStorageManager implements StorageManager
buffer.writeSimpleString(name); buffer.writeSimpleString(name);
buffer.writeSimpleString(address); buffer.writeSimpleString(address);
buffer.writeNullableSimpleString(filterString); buffer.writeNullableSimpleString(filterString);
buffer.writeBoolean(autoCreated);
} }
public int getEncodeSize() public int getEncodeSize()
{ {
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) + return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
SimpleString.sizeofNullableString(filterString); SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN;
} }
} }

View File

@ -72,6 +72,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaData
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
@ -81,6 +82,7 @@ import org.apache.activemq.core.protocol.core.impl.wireformat.SessionForceConsum
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
@ -230,7 +232,15 @@ public class ServerSessionPacketHandler implements ChannelHandler
{ {
// We send back queue information on the queue as a response- this allows the queue to // We send back queue information on the queue as a response- this allows the queue to
// be automatically recreated on failover // be automatically recreated on failover
response = new SessionQueueQueryResponseMessage(session.executeQueueQuery(request.getQueueName())); QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2))
{
response = new SessionQueueQueryResponseMessage_V2(queueQueryResult);
}
else
{
response = new SessionQueueQueryResponseMessage(queueQueryResult);
}
} }
break; break;
@ -277,7 +287,14 @@ public class ServerSessionPacketHandler implements ChannelHandler
requiresResponse = true; requiresResponse = true;
SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet; SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
QueueQueryResult result = session.executeQueueQuery(request.getQueueName()); QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
response = new SessionQueueQueryResponseMessage(result); if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2))
{
response = new SessionQueueQueryResponseMessage_V2(result);
}
else
{
response = new SessionQueueQueryResponseMessage(result);
}
break; break;
} }
case SESS_BINDINGQUERY: case SESS_BINDINGQUERY:
@ -285,7 +302,14 @@ public class ServerSessionPacketHandler implements ChannelHandler
requiresResponse = true; requiresResponse = true;
SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet; SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
BindingQueryResult result = session.executeBindingQuery(request.getAddress()); BindingQueryResult result = session.executeBindingQuery(request.getAddress());
response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames()); if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2))
{
response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues());
}
else
{
response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
}
break; break;
} }
case SESS_ACKNOWLEDGE: case SESS_ACKNOWLEDGE:

View File

@ -177,6 +177,13 @@ public interface ActiveMQServer extends ActiveMQComponent
boolean durable, boolean durable,
boolean temporary) throws Exception; boolean temporary) throws Exception;
Queue createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
boolean durable,
boolean temporary,
boolean autoCreated) throws Exception;
Queue deployQueue(SimpleString address, Queue deployQueue(SimpleString address,
SimpleString queueName, SimpleString queueName,
SimpleString filterString, SimpleString filterString,

View File

@ -1366,4 +1366,8 @@ public interface ActiveMQServerLogger extends BasicLogger
@Message(id = 224064, value = "Setting <{0}> is invalid with this HA Policy Configuration. Please use <ha-policy> exclusively or remove. Ignoring <{0}> value.", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224064, value = "Setting <{0}> is invalid with this HA Policy Configuration. Please use <ha-policy> exclusively or remove. Ignoring <{0}> value.", format = Message.Format.MESSAGE_FORMAT)
void incompatibleWithHAPolicyChosen(String parameter); void incompatibleWithHAPolicyChosen(String parameter);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224065, value = "Failed to remove auto-created queue {0}", format = Message.Format.MESSAGE_FORMAT)
void errorRemovingAutoCreatedQueue(@Cause Exception e, SimpleString bindingName);
} }

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.server;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.utils.ReferenceCounter;
/**
* @author Clebert Suconic
*/
public interface AutoCreatedQueueManager extends ReferenceCounter
{
SimpleString getQueueName();
}

View File

@ -33,11 +33,15 @@ public class BindingQueryResult
private List<SimpleString> queueNames; private List<SimpleString> queueNames;
public BindingQueryResult(final boolean exists, final List<SimpleString> queueNames) private boolean autoCreateJmsQueues;
public BindingQueryResult(final boolean exists, final List<SimpleString> queueNames, final boolean autoCreateJmsQueues)
{ {
this.exists = exists; this.exists = exists;
this.queueNames = queueNames; this.queueNames = queueNames;
this.autoCreateJmsQueues = autoCreateJmsQueues;
} }
public boolean isExists() public boolean isExists()
@ -45,6 +49,11 @@ public class BindingQueryResult
return exists; return exists;
} }
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues;
}
public List<SimpleString> getQueueNames() public List<SimpleString> getQueueNames()
{ {
return queueNames; return queueNames;

View File

@ -49,6 +49,8 @@ public interface Queue extends Bindable
boolean isTemporary(); boolean isTemporary();
boolean isAutoCreated();
void addConsumer(Consumer consumer) throws Exception; void addConsumer(Consumer consumer) throws Exception;
void removeConsumer(Consumer consumer); void removeConsumer(Consumer consumer);
@ -62,7 +64,7 @@ public interface Queue extends Bindable
* on shared subscriptions where the queue needs to be deleted when all the * on shared subscriptions where the queue needs to be deleted when all the
* consumers are closed. * consumers are closed.
*/ */
void setConsumersRefCount(ActiveMQServer server); void setConsumersRefCount(ReferenceCounter referenceCounter);
ReferenceCounter getConsumersRefCount(); ReferenceCounter getConsumersRefCount();

View File

@ -39,7 +39,8 @@ public interface QueueFactory
Filter filter, Filter filter,
PageSubscription pageSubscription, PageSubscription pageSubscription,
boolean durable, boolean durable,
boolean temporary); boolean temporary,
boolean autoCreated);
/** /**
* This is required for delete-all-reference to work correctly with paging * This is required for delete-all-reference to work correctly with paging

View File

@ -1193,9 +1193,18 @@ public class ActiveMQServerImpl implements ActiveMQServer
final boolean durable, final boolean durable,
final boolean temporary) throws Exception final boolean temporary) throws Exception
{ {
return createQueue(address, queueName, filterString, durable, temporary, false, false); return createQueue(address, queueName, filterString, durable, temporary, false, false, false);
} }
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception
{
return createQueue(address, queueName, filterString, durable, temporary, false, false, autoCreated);
}
/** /**
* Creates a transient queue. A queue that will exist as long as there are consumers. * Creates a transient queue. A queue that will exist as long as there are consumers.
@ -1214,7 +1223,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
final SimpleString filterString, final SimpleString filterString,
boolean durable) throws Exception boolean durable) throws Exception
{ {
Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable); Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable, false);
if (!queue.getAddress().equals(address)) if (!queue.getAddress().equals(address))
{ {
@ -1263,7 +1272,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
{ {
ActiveMQServerLogger.LOGGER.deployQueue(queueName); ActiveMQServerLogger.LOGGER.deployQueue(queueName);
return createQueue(address, queueName, filterString, durable, temporary, true, false); return createQueue(address, queueName, filterString, durable, temporary, true, false, false);
} }
public void destroyQueue(final SimpleString queueName) throws Exception public void destroyQueue(final SimpleString queueName) throws Exception
@ -1981,7 +1990,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final boolean ignoreIfExists, final boolean ignoreIfExists,
final boolean transientQueue) throws Exception final boolean transientQueue,
final boolean autoCreated) throws Exception
{ {
QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
@ -2021,11 +2031,16 @@ public class ActiveMQServerImpl implements ActiveMQServer
filter, filter,
pageSubscription, pageSubscription,
durable, durable,
temporary); temporary,
autoCreated);
if (transientQueue) if (transientQueue)
{ {
queue.setConsumersRefCount(this); queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName));
}
else if (autoCreated)
{
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queueName));
} }
binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId()); binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId());

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.server.impl;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.AutoCreatedQueueManager;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.utils.ReferenceCounterUtil;
/**
* @author Clebert Suconic
*/
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager
{
private final SimpleString queueName;
private final ActiveMQServer server;
private final Runnable runnable = new Runnable()
{
public void run()
{
try
{
Queue queue = server.locateQueue(queueName);
long consumerCount = queue.getConsumerCount();
long messageCount = queue.getMessageCount();
if (server.locateQueue(queueName).getMessageCount() == 0)
{
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + "\" because consumerCount = " + consumerCount + " and messageCount = " + messageCount);
}
server.destroyQueue(queueName, null, false);
}
else if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug("NOT deleting auto-created queue \"" + queueName + "\" because consumerCount = " + consumerCount + " and messageCount = " + messageCount);
}
}
catch (Exception e)
{
ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
}
}
};
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName)
{
this.server = server;
this.queueName = queueName;
}
@Override
public int increment()
{
return referenceCounterUtil.increment();
}
@Override
public int decrement()
{
return referenceCounterUtil.decrement();
}
@Override
public SimpleString getQueueName()
{
return queueName;
}
}

View File

@ -55,6 +55,7 @@ public class LastValueQueue extends QueueImpl
final PageSubscription pageSubscription, final PageSubscription pageSubscription,
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final boolean autoCreated,
final ScheduledExecutorService scheduledExecutor, final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice, final PostOffice postOffice,
final StorageManager storageManager, final StorageManager storageManager,
@ -68,6 +69,7 @@ public class LastValueQueue extends QueueImpl
pageSubscription, pageSubscription,
durable, durable,
temporary, temporary,
autoCreated,
scheduledExecutor, scheduledExecutor,
postOffice, postOffice,
storageManager, storageManager,

View File

@ -46,6 +46,7 @@ import org.apache.activemq.core.postoffice.Binding;
import org.apache.activemq.core.postoffice.DuplicateIDCache; import org.apache.activemq.core.postoffice.DuplicateIDCache;
import org.apache.activemq.core.postoffice.PostOffice; import org.apache.activemq.core.postoffice.PostOffice;
import org.apache.activemq.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.NodeManager; import org.apache.activemq.core.server.NodeManager;
@ -155,7 +156,13 @@ public class PostOfficeJournalLoader implements JournalLoader
filter, filter,
subscription, subscription,
true, true,
false); false,
queueBindingInfo.isAutoCreated());
if (queueBindingInfo.isAutoCreated())
{
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
}
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId()); Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());

View File

@ -75,7 +75,8 @@ public class QueueFactoryImpl implements QueueFactory
final Filter filter, final Filter filter,
final PageSubscription pageSubscription, final PageSubscription pageSubscription,
final boolean durable, final boolean durable,
final boolean temporary) final boolean temporary,
final boolean autoCreated)
{ {
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
@ -89,6 +90,7 @@ public class QueueFactoryImpl implements QueueFactory
pageSubscription, pageSubscription,
durable, durable,
temporary, temporary,
autoCreated,
scheduledExecutor, scheduledExecutor,
postOffice, postOffice,
storageManager, storageManager,
@ -104,6 +106,7 @@ public class QueueFactoryImpl implements QueueFactory
pageSubscription, pageSubscription,
durable, durable,
temporary, temporary,
autoCreated,
scheduledExecutor, scheduledExecutor,
postOffice, postOffice,
storageManager, storageManager,

View File

@ -61,7 +61,6 @@ import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.Consumer; import org.apache.activemq.core.server.Consumer;
import org.apache.activemq.core.server.HandleStatus; import org.apache.activemq.core.server.HandleStatus;
import org.apache.activemq.core.server.ActiveMQMessageBundle; import org.apache.activemq.core.server.ActiveMQMessageBundle;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue; import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.RoutingContext; import org.apache.activemq.core.server.RoutingContext;
@ -131,6 +130,8 @@ public class QueueImpl implements Queue
private final boolean temporary; private final boolean temporary;
private final boolean autoCreated;
private final PostOffice postOffice; private final PostOffice postOffice;
private volatile boolean queueDestroyed = false; private volatile boolean queueDestroyed = false;
@ -315,6 +316,7 @@ public class QueueImpl implements Queue
final Filter filter, final Filter filter,
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final boolean autoCreated,
final ScheduledExecutorService scheduledExecutor, final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice, final PostOffice postOffice,
final StorageManager storageManager, final StorageManager storageManager,
@ -328,6 +330,7 @@ public class QueueImpl implements Queue
null, null,
durable, durable,
temporary, temporary,
autoCreated,
scheduledExecutor, scheduledExecutor,
postOffice, postOffice,
storageManager, storageManager,
@ -342,6 +345,7 @@ public class QueueImpl implements Queue
final PageSubscription pageSubscription, final PageSubscription pageSubscription,
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final boolean autoCreated,
final ScheduledExecutorService scheduledExecutor, final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice, final PostOffice postOffice,
final StorageManager storageManager, final StorageManager storageManager,
@ -362,6 +366,8 @@ public class QueueImpl implements Queue
this.temporary = temporary; this.temporary = temporary;
this.autoCreated = autoCreated;
this.postOffice = postOffice; this.postOffice = postOffice;
this.storageManager = storageManager; this.storageManager = storageManager;
@ -425,11 +431,11 @@ public class QueueImpl implements Queue
} }
// Queue implementation ---------------------------------------------------------------------------------------- // Queue implementation ----------------------------------------------------------------------------------------
public synchronized void setConsumersRefCount(final ActiveMQServer server) public synchronized void setConsumersRefCount(final ReferenceCounter referenceCounter)
{ {
if (refCountForConsumers == null) if (refCountForConsumers == null)
{ {
this.refCountForConsumers = new TransientQueueManagerImpl(server, this.name); this.refCountForConsumers = referenceCounter;
} }
} }
@ -449,6 +455,11 @@ public class QueueImpl implements Queue
return temporary; return temporary;
} }
public boolean isAutoCreated()
{
return autoCreated;
}
public SimpleString getName() public SimpleString getName()
{ {
return name; return name;

View File

@ -37,6 +37,7 @@ import org.apache.activemq.api.core.Pair;
import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.management.CoreNotificationType; import org.apache.activemq.api.core.management.CoreNotificationType;
import org.apache.activemq.api.core.management.ManagementHelper; import org.apache.activemq.api.core.management.ManagementHelper;
import org.apache.activemq.api.core.management.ResourceNames;
import org.apache.activemq.core.client.impl.ClientMessageImpl; import org.apache.activemq.core.client.impl.ClientMessageImpl;
import org.apache.activemq.core.exception.ActiveMQXAException; import org.apache.activemq.core.exception.ActiveMQXAException;
import org.apache.activemq.core.filter.Filter; import org.apache.activemq.core.filter.Filter;
@ -549,7 +550,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener
securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this); securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
} }
Queue queue = server.createQueue(address, name, filterString, durable, temporary); // any non-temporary JMS queue created via this method should be marked as auto-created
if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name))
{
server.createQueue(address, name, filterString, durable, temporary, true);
}
else
{
server.createQueue(address, name, filterString, durable, temporary);
}
if (temporary) if (temporary)
{ {
@ -676,6 +685,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception
{ {
boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
if (name == null) if (name == null)
{ {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull(); throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
@ -699,16 +710,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener
queue.isTemporary(), queue.isTemporary(),
filterString, filterString,
queue.getConsumerCount(), queue.getConsumerCount(),
queue.getMessageCount()); queue.getMessageCount(),
autoCreateJmsQueues);
} }
// make an exception for the management address (see HORNETQ-29) // make an exception for the management address (see HORNETQ-29)
else if (name.equals(managementAddress)) else if (name.equals(managementAddress))
{ {
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1); response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
}
else if (autoCreateJmsQueues)
{
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
} }
else else
{ {
response = new QueueQueryResult(); response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
} }
return response; return response;
@ -716,6 +732,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception
{ {
boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
if (address == null) if (address == null)
{ {
throw ActiveMQMessageBundle.BUNDLE.addressIsNull(); throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
@ -726,7 +744,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
// make an exception for the management address (see HORNETQ-29) // make an exception for the management address (see HORNETQ-29)
if (address.equals(managementAddress)) if (address.equals(managementAddress))
{ {
return new BindingQueryResult(true, names); return new BindingQueryResult(true, names, autoCreateJmsQueues);
} }
Bindings bindings = postOffice.getMatchingBindings(address); Bindings bindings = postOffice.getMatchingBindings(address);
@ -739,7 +757,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
} }
} }
return new BindingQueryResult(!names.isEmpty(), names); return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
} }
public void forceConsumerDelivery(final long consumerID, final long sequence) throws Exception public void forceConsumerDelivery(final long consumerID, final long sequence) throws Exception

View File

@ -56,6 +56,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final boolean DEFAULT_LAST_VALUE_QUEUE = false; public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;
public static final boolean DEFAULT_AUTO_CREATE_QUEUES = true;
public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true;
public static final long DEFAULT_REDISTRIBUTION_DELAY = -1; public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;
public static final long DEFAULT_EXPIRY_DELAY = -1; public static final long DEFAULT_EXPIRY_DELAY = -1;
@ -106,6 +110,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private SlowConsumerPolicy slowConsumerPolicy = null; private SlowConsumerPolicy slowConsumerPolicy = null;
private Boolean autoCreateJmsQueues = null;
private Boolean autoDeleteJmsQueues = null;
public AddressSettings(AddressSettings other) public AddressSettings(AddressSettings other)
{ {
this.addressFullMessagePolicy = other.addressFullMessagePolicy; this.addressFullMessagePolicy = other.addressFullMessagePolicy;
@ -127,6 +135,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.slowConsumerThreshold = other.slowConsumerThreshold; this.slowConsumerThreshold = other.slowConsumerThreshold;
this.slowConsumerCheckPeriod = other.slowConsumerCheckPeriod; this.slowConsumerCheckPeriod = other.slowConsumerCheckPeriod;
this.slowConsumerPolicy = other.slowConsumerPolicy; this.slowConsumerPolicy = other.slowConsumerPolicy;
this.autoCreateJmsQueues = other.autoCreateJmsQueues;
this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
} }
public AddressSettings() public AddressSettings()
@ -134,6 +144,26 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
} }
public boolean isAutoCreateJmsQueues()
{
return autoCreateJmsQueues != null ? autoCreateJmsQueues : AddressSettings.DEFAULT_AUTO_CREATE_QUEUES;
}
public void setAutoCreateJmsQueues(final boolean autoCreateJmsQueues)
{
this.autoCreateJmsQueues = autoCreateJmsQueues;
}
public boolean isAutoDeleteJmsQueues()
{
return autoDeleteJmsQueues != null ? autoDeleteJmsQueues : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES;
}
public void setAutoDeleteJmsQueues(final boolean autoDeleteJmsQueues)
{
this.autoDeleteJmsQueues = autoDeleteJmsQueues;
}
public boolean isLastValueQueue() public boolean isLastValueQueue()
{ {
return lastValueQueue != null ? lastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE; return lastValueQueue != null ? lastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE;
@ -398,6 +428,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
{ {
slowConsumerPolicy = merged.slowConsumerPolicy; slowConsumerPolicy = merged.slowConsumerPolicy;
} }
if (autoCreateJmsQueues == null)
{
autoCreateJmsQueues = merged.autoCreateJmsQueues;
}
if (autoDeleteJmsQueues == null)
{
autoDeleteJmsQueues = merged.autoDeleteJmsQueues;
}
} }
@Override @Override
@ -458,6 +496,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
{ {
slowConsumerPolicy = null; slowConsumerPolicy = null;
} }
autoCreateJmsQueues = BufferHelper.readNullableBoolean(buffer);
autoDeleteJmsQueues = BufferHelper.readNullableBoolean(buffer);
} }
@Override @Override
@ -482,7 +524,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute) + BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute) +
BufferHelper.sizeOfNullableLong(slowConsumerCheckPeriod) + BufferHelper.sizeOfNullableLong(slowConsumerCheckPeriod) +
BufferHelper.sizeOfNullableLong(slowConsumerThreshold) + BufferHelper.sizeOfNullableLong(slowConsumerThreshold) +
BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null); BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null) +
BufferHelper.sizeOfNullableBoolean(autoCreateJmsQueues) +
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues);
} }
@Override @Override
@ -526,6 +570,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableLong(buffer, slowConsumerCheckPeriod); BufferHelper.writeNullableLong(buffer, slowConsumerCheckPeriod);
buffer.writeNullableSimpleString(slowConsumerPolicy != null ? new SimpleString(slowConsumerPolicy.toString()) : null); buffer.writeNullableSimpleString(slowConsumerPolicy != null ? new SimpleString(slowConsumerPolicy.toString()) : null);
BufferHelper.writeNullableBoolean(buffer, autoCreateJmsQueues);
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsQueues);
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -556,6 +604,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((slowConsumerThreshold == null) ? 0 : slowConsumerThreshold.hashCode()); result = prime * result + ((slowConsumerThreshold == null) ? 0 : slowConsumerThreshold.hashCode());
result = prime * result + ((slowConsumerCheckPeriod == null) ? 0 : slowConsumerCheckPeriod.hashCode()); result = prime * result + ((slowConsumerCheckPeriod == null) ? 0 : slowConsumerCheckPeriod.hashCode());
result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode()); result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode());
result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
return result; return result;
} }
@ -705,6 +755,20 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
} }
else if (!slowConsumerPolicy.equals(other.slowConsumerPolicy)) else if (!slowConsumerPolicy.equals(other.slowConsumerPolicy))
return false; return false;
if (autoCreateJmsQueues == null)
{
if (other.autoCreateJmsQueues != null)
return false;
}
else if (!autoCreateJmsQueues.equals(other.autoCreateJmsQueues))
return false;
if (autoDeleteJmsQueues == null)
{
if (other.autoDeleteJmsQueues != null)
return false;
}
else if (!autoDeleteJmsQueues.equals(other.autoDeleteJmsQueues))
return false;
return true; return true;
} }
@ -751,6 +815,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
slowConsumerCheckPeriod + slowConsumerCheckPeriod +
", slowConsumerPolicy=" + ", slowConsumerPolicy=" +
slowConsumerPolicy + slowConsumerPolicy +
", autoCreateJmsQueues=" +
autoCreateJmsQueues +
", autoDeleteJmsQueues=" +
autoDeleteJmsQueues +
"]"; "]";
} }
} }

View File

@ -2258,6 +2258,23 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="auto-create-jms-queues" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether or not to automatically create JMS queues when a producer sends or a consumer connects to a
queue
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="auto-delete-jms-queues" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether or not to delete auto-created JMS queues when the queue has 0 consumers and 0 messages
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required"> <xsd:attribute name="match" type="xsd:string" use="required">

View File

@ -297,6 +297,8 @@ public class FileConfigurationTest extends ConfigurationImplTest
assertEquals(10, conf.getAddressesSettings().get("a1").getSlowConsumerThreshold()); assertEquals(10, conf.getAddressesSettings().get("a1").getSlowConsumerThreshold());
assertEquals(5, conf.getAddressesSettings().get("a1").getSlowConsumerCheckPeriod()); assertEquals(5, conf.getAddressesSettings().get("a1").getSlowConsumerCheckPeriod());
assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy()); assertEquals(SlowConsumerPolicy.NOTIFY, conf.getAddressesSettings().get("a1").getSlowConsumerPolicy());
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsQueues());
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues());
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString()); assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString()); assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
@ -308,6 +310,8 @@ public class FileConfigurationTest extends ConfigurationImplTest
assertEquals(20, conf.getAddressesSettings().get("a2").getSlowConsumerThreshold()); assertEquals(20, conf.getAddressesSettings().get("a2").getSlowConsumerThreshold());
assertEquals(15, conf.getAddressesSettings().get("a2").getSlowConsumerCheckPeriod()); assertEquals(15, conf.getAddressesSettings().get("a2").getSlowConsumerCheckPeriod());
assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy()); assertEquals(SlowConsumerPolicy.KILL, conf.getAddressesSettings().get("a2").getSlowConsumerPolicy());
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsQueues());
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues());
assertEquals(2, conf.getQueueConfigurations().size()); assertEquals(2, conf.getQueueConfigurations().size());

View File

@ -42,7 +42,6 @@ import org.apache.activemq.core.message.BodyEncoder;
import org.apache.activemq.core.paging.PagingStore; import org.apache.activemq.core.paging.PagingStore;
import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.server.Consumer; import org.apache.activemq.core.server.Consumer;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue; import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.RoutingContext; import org.apache.activemq.core.server.RoutingContext;
@ -1053,6 +1052,12 @@ public class ScheduledDeliveryHandlerTest extends Assert
return false; return false;
} }
@Override
public boolean isAutoCreated()
{
return false;
}
@Override @Override
public void addConsumer(Consumer consumer) throws Exception public void addConsumer(Consumer consumer) throws Exception
{ {
@ -1072,7 +1077,7 @@ public class ScheduledDeliveryHandlerTest extends Assert
} }
@Override @Override
public void setConsumersRefCount(ActiveMQServer server) public void setConsumersRefCount(ReferenceCounter referenceCounter)
{ {
} }

View File

@ -46,7 +46,8 @@ public class AddressSettingsTest extends UnitTestCase
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD, addressSettings.getSlowConsumerThreshold()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD, addressSettings.getSlowConsumerThreshold());
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD, addressSettings.getSlowConsumerCheckPeriod()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_CHECK_PERIOD, addressSettings.getSlowConsumerCheckPeriod());
Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy()); Assert.assertEquals(AddressSettings.DEFAULT_SLOW_CONSUMER_POLICY, addressSettings.getSlowConsumerPolicy());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_QUEUES, addressSettings.isAutoCreateJmsQueues());
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_QUEUES, addressSettings.isAutoDeleteJmsQueues());
} }
@Test @Test

View File

@ -271,6 +271,8 @@
<slow-consumer-threshold>10</slow-consumer-threshold> <slow-consumer-threshold>10</slow-consumer-threshold>
<slow-consumer-check-period>5</slow-consumer-check-period> <slow-consumer-check-period>5</slow-consumer-check-period>
<slow-consumer-policy>NOTIFY</slow-consumer-policy> <slow-consumer-policy>NOTIFY</slow-consumer-policy>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-delete-jms-queues>true</auto-delete-jms-queues>
</address-setting> </address-setting>
<address-setting match="a2"> <address-setting match="a2">
<dead-letter-address>a2.1</dead-letter-address> <dead-letter-address>a2.1</dead-letter-address>
@ -283,6 +285,8 @@
<slow-consumer-threshold>20</slow-consumer-threshold> <slow-consumer-threshold>20</slow-consumer-threshold>
<slow-consumer-check-period>15</slow-consumer-check-period> <slow-consumer-check-period>15</slow-consumer-check-period>
<slow-consumer-policy>KILL</slow-consumer-policy> <slow-consumer-policy>KILL</slow-consumer-policy>
<auto-create-jms-queues>false</auto-create-jms-queues>
<auto-delete-jms-queues>false</auto-delete-jms-queues>
</address-setting> </address-setting>
</address-settings> </address-settings>
<connector-services> <connector-services>

View File

@ -95,6 +95,8 @@ entry that would be found in the `activemq-configuration.xml` file.
<slow-consumer-threshold>-1</slow-consumer-threshold> <slow-consumer-threshold>-1</slow-consumer-threshold>
<slow-consumer-policy>NOTIFY</slow-consumer-policy> <slow-consumer-policy>NOTIFY</slow-consumer-policy>
<slow-consumer-check-period>5</slow-consumer-check-period> <slow-consumer-check-period>5</slow-consumer-check-period>
<auto-create-queues>true</auto-create-queues>
<auto-delete-queues>true</auto-delete-queues>
</address-setting> </address-setting>
</address-settings> </address-settings>
@ -176,3 +178,13 @@ on this notification.
`slow-consumer-check-period`. How often to check for slow consumers on a `slow-consumer-check-period`. How often to check for slow consumers on a
particular queue. Measured in minutes. Default is 5. See ? for more particular queue. Measured in minutes. Default is 5. See ? for more
information about slow consumer detection. information about slow consumer detection.
`auto-create-jms-queues`. Whether or not the broker should automatically
create a JMS queue when a JMS message is sent to a queue whose name fits
the address `match` (remember, a JMS queue is just a core queue which has
the same address and queue name) or a JMS consumer tries to connect to a
queue whose name fits the address `match`. Queues which are auto-created
are durable, non-temporary, and non-transient.
`auto-delete-jms-queues`. Whether or not to the broker should automatically
delete auto-created JMS queues when they have both 0 consumers and 0 messages.

View File

@ -49,7 +49,7 @@
<activemq.version.majorVersion>6</activemq.version.majorVersion> <activemq.version.majorVersion>6</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion> <activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion> <activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>125,124,123,122</activemq.version.incrementingVersion> <activemq.version.incrementingVersion>126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionSuffix>SNAPSHOT</activemq.version.versionSuffix> <activemq.version.versionSuffix>SNAPSHOT</activemq.version.versionSuffix>
<activemq.version.versionTag>SNAPSHOT</activemq.version.versionTag> <activemq.version.versionTag>SNAPSHOT</activemq.version.versionTag>
<ActiveMQ-Version> <ActiveMQ-Version>

View File

@ -77,6 +77,7 @@ public class QueueTest extends UnitTestCase
null, null,
null, null,
false, false,
false,
false); false);
FakeConsumer consumer = new FakeConsumer(); FakeConsumer consumer = new FakeConsumer();

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.client;
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.core.security.Role;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.tests.util.JMSTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author Justin Bertram
*/
public class AutoCreateJmsQueueTest extends JMSTestBase
{
@Test
public void testAutoCreateOnSendToQueue() throws Exception
{
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
MessageProducer producer = session.createProducer(queue);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++)
{
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
producer.close();
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < numMessages; i++)
{
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
}
connection.close();
}
@Test
public void testAutoCreateOnSendToQueueSecurity() throws Exception
{
server.getSecurityManager().addUser("guest", "guest");
server.getSecurityManager().setDefaultUser("guest");
server.getSecurityManager().addRole("guest", "rejectAll");
Role role = new Role("rejectAll", false, false, false, false, false, false, false);
Set<Role> roles = new HashSet<Role>();
roles.add(role);
server.getSecurityRepository().addMatch("#", roles);
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
try
{
MessageProducer producer = session.createProducer(queue);
Assert.fail("Creating a producer here should throw a JMSSecurityException");
}
catch (Exception e)
{
Assert.assertTrue(e instanceof JMSSecurityException);
}
connection.close();
}
@Test
public void testAutoCreateOnSendToTopic() throws Exception
{
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
try
{
MessageProducer producer = session.createProducer(topic);
Assert.fail("Creating a producer here should throw an exception");
}
catch (Exception e)
{
Assert.assertTrue(e instanceof InvalidDestinationException);
}
connection.close();
}
@Test
public void testAutoCreateOnConsumeFromQueue() throws Exception
{
Connection connection = null;
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
Message m = messageConsumer.receive(500);
Assert.assertNull(m);
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString("jms.queue.test")).getBindable();
Assert.assertEquals(0, q.getMessageCount());
Assert.assertEquals(0, q.getMessagesAdded());
connection.close();
}
@Test
public void testAutoCreateOnConsumeFromTopic() throws Exception
{
Connection connection = null;
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = ActiveMQJMSClient.createTopic("test");
try
{
MessageConsumer messageConsumer = session.createConsumer(topic);
Assert.fail("Creating a consumer here should throw an exception");
}
catch (Exception e)
{
Assert.assertTrue(e instanceof InvalidDestinationException);
}
connection.close();
}
@Before
@Override
public void setUp() throws Exception
{
super.setUp();
server.getSecurityManager().addUser("guest", "guest");
server.getSecurityManager().setDefaultUser("guest");
server.getSecurityManager().addRole("guest", "allowAll");
Role role = new Role("allowAll", true, true, true, true, true, true, true);
Set<Role> roles = new HashSet<Role>();
roles.add(role);
server.getSecurityRepository().addMatch("#", roles);
}
protected boolean useSecurity()
{
return true;
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.client;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.tests.util.JMSTestBase;
import org.junit.Assert;
import org.junit.Test;
/**
* @author Justin Bertram
*/
public class AutoDeleteJmsQueueTest extends JMSTestBase
{
@Test
public void testAutoDelete() throws Exception
{
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
MessageProducer producer = session.createProducer(queue);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++)
{
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
producer.close();
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < numMessages - 1; i++)
{
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
}
session.close();
// ensure the queue is still there
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString("jms.queue.test")).getBindable();
Assert.assertEquals(1, q.getMessageCount());
Assert.assertEquals(numMessages, q.getMessagesAdded());
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
messageConsumer = session.createConsumer(queue);
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
connection.close();
// ensure the queue was removed
Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("jms.queue.test")));
}
}

View File

@ -242,6 +242,7 @@ public class HangConsumerTest extends ServiceTestBase
final PageSubscription pageSubscription, final PageSubscription pageSubscription,
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final boolean autoCreated,
final ScheduledExecutorService scheduledExecutor, final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice, final PostOffice postOffice,
final StorageManager storageManager, final StorageManager storageManager,
@ -255,6 +256,7 @@ public class HangConsumerTest extends ServiceTestBase
pageSubscription, pageSubscription,
durable, durable,
temporary, temporary,
autoCreated,
scheduledExecutor, scheduledExecutor,
postOffice, postOffice,
storageManager, storageManager,
@ -294,7 +296,8 @@ public class HangConsumerTest extends ServiceTestBase
final Filter filter, final Filter filter,
final PageSubscription pageSubscription, final PageSubscription pageSubscription,
final boolean durable, final boolean durable,
final boolean temporary) final boolean temporary,
final boolean autoCreated)
{ {
queue = new MyQueueWithBlocking(persistenceID, queue = new MyQueueWithBlocking(persistenceID,
address, address,
@ -303,6 +306,7 @@ public class HangConsumerTest extends ServiceTestBase
pageSubscription, pageSubscription,
durable, durable,
temporary, temporary,
autoCreated,
scheduledExecutor, scheduledExecutor,
postOffice, postOffice,
storageManager, storageManager,
@ -401,7 +405,7 @@ public class HangConsumerTest extends ServiceTestBase
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, true, false, null, null, null, null, null), server.getNodeID()); LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, true, false, false, null, null, null, null, null), server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding); server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID); server.getStorageManager().commitBindings(txID);

View File

@ -506,6 +506,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
PageSubscription pageSubscription, PageSubscription pageSubscription,
boolean durable, boolean durable,
boolean temporary, boolean temporary,
boolean autoCreated,
ScheduledExecutorService scheduledExecutor, ScheduledExecutorService scheduledExecutor,
PostOffice postOffice, PostOffice postOffice,
StorageManager storageManager, StorageManager storageManager,
@ -519,6 +520,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
pageSubscription, pageSubscription,
durable, durable,
temporary, temporary,
autoCreated,
scheduledExecutor, scheduledExecutor,
postOffice, postOffice,
storageManager, storageManager,
@ -570,7 +572,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
Filter filter, Filter filter,
PageSubscription pageSubscription, PageSubscription pageSubscription,
boolean durable, boolean durable,
boolean temporary) boolean temporary,
boolean autoCreated)
{ {
return new NoPostACKQueue(persistenceID, return new NoPostACKQueue(persistenceID,
@ -580,6 +583,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
pageSubscription, pageSubscription,
durable, durable,
temporary, temporary,
autoCreated,
scheduledExecutor, scheduledExecutor,
postOffice, postOffice,
storageManager, storageManager,

View File

@ -749,7 +749,9 @@ public class PagingOrderTest extends ServiceTestBase
"PAGE", "PAGE",
-1, -1,
10, 10,
"KILL"); "KILL",
true,
true);
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
new TransportConfiguration(INVM_CONNECTOR_FACTORY)); new TransportConfiguration(INVM_CONNECTOR_FACTORY));
@ -824,7 +826,9 @@ public class PagingOrderTest extends ServiceTestBase
"PAGE", "PAGE",
-1, -1,
10, 10,
"KILL"); "KILL",
true,
true);
jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1"); jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1");

View File

@ -82,7 +82,7 @@ public class TopicCleanupTest extends JMSTestBase
{ {
long txid = storage.generateID(); long txid = storage.generateID();
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false, server.getScheduledPool(), server.getPostOffice(), final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false, false, server.getScheduledPool(), server.getPostOffice(),
storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor()); storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID()); LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());

View File

@ -62,7 +62,7 @@ public class NonExistentQueueTest extends JMSTestBase
@Test @Test
public void sendToNonExistantDestination() throws Exception public void sendToNonExistantDestination() throws Exception
{ {
Destination destination = ActiveMQJMSClient.createQueue("DoesNotExist"); Destination destination = ActiveMQJMSClient.createTopic("DoesNotExist");
TransportConfiguration transportConfiguration = new TransportConfiguration(InVMConnectorFactory.class.getName()); TransportConfiguration transportConfiguration = new TransportConfiguration(InVMConnectorFactory.class.getName());
ConnectionFactory localConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, ConnectionFactory localConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
transportConfiguration); transportConfiguration);

View File

@ -507,6 +507,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase
long slowConsumerThreshold = 5; long slowConsumerThreshold = 5;
long slowConsumerCheckPeriod = 10; long slowConsumerCheckPeriod = 10;
String slowConsumerPolicy = SlowConsumerPolicy.KILL.toString(); String slowConsumerPolicy = SlowConsumerPolicy.KILL.toString();
boolean autoCreateJmsQueues = false;
boolean autoDeleteJmsQueues = false;
serverControl.addAddressSettings(addressMatch, serverControl.addAddressSettings(addressMatch,
DLA, DLA,
@ -525,7 +527,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase
addressFullMessagePolicy, addressFullMessagePolicy,
slowConsumerThreshold, slowConsumerThreshold,
slowConsumerCheckPeriod, slowConsumerCheckPeriod,
slowConsumerPolicy); slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
boolean ex = false; boolean ex = false;
@ -548,7 +552,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase
addressFullMessagePolicy, addressFullMessagePolicy,
slowConsumerThreshold, slowConsumerThreshold,
slowConsumerCheckPeriod, slowConsumerCheckPeriod,
slowConsumerPolicy); slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
} }
catch (Exception expected) catch (Exception expected)
{ {
@ -578,6 +584,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase
assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold()); assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold());
assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod()); assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod());
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy()); assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
serverControl.addAddressSettings(addressMatch, serverControl.addAddressSettings(addressMatch,
DLA, DLA,
@ -596,7 +604,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase
addressFullMessagePolicy, addressFullMessagePolicy,
slowConsumerThreshold, slowConsumerThreshold,
slowConsumerCheckPeriod, slowConsumerCheckPeriod,
slowConsumerPolicy); slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
jsonString = serverControl.getAddressSettingsAsJSON(exactAddress); jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
@ -618,6 +628,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase
assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold()); assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold());
assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod()); assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod());
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy()); assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
ex = false; ex = false;
@ -640,7 +652,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase
addressFullMessagePolicy, addressFullMessagePolicy,
slowConsumerThreshold, slowConsumerThreshold,
slowConsumerCheckPeriod, slowConsumerCheckPeriod,
slowConsumerPolicy); slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
} }
catch (Exception e) catch (Exception e)
{ {

View File

@ -582,7 +582,9 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy, @Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
@Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold, @Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod, @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy) throws Exception @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues) throws Exception
{ {
proxy.invokeOperation("addAddressSettings", proxy.invokeOperation("addAddressSettings",
addressMatch, addressMatch,
@ -602,7 +604,9 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
addressFullMessagePolicy, addressFullMessagePolicy,
slowConsumerThreshold, slowConsumerThreshold,
slowConsumerCheckPeriod, slowConsumerCheckPeriod,
slowConsumerPolicy); slowConsumerPolicy,
autoCreateJmsQueues,
autoDeleteJmsQueues);
} }
public void removeAddressSettings(String addressMatch) throws Exception public void removeAddressSettings(String addressMatch) throws Exception

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.tests.integration.persistence; package org.apache.activemq.tests.integration.persistence;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.io.StringReader; import java.io.StringReader;
@ -47,37 +48,40 @@ public class ExportFormatTest extends ServiceTestBase
// Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field // Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field
String bindingsFile = "#File,JournalFileImpl: (activemq-bindings-1.bindings id = 1, recordID = 1)\n" + String bindingsFile = "#File,JournalFileImpl: (activemq-bindings-1.bindings id = 1, recordID = 1)\n" +
"operation@AddRecord,id@2,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAH____8=\n" + "operation@AddRecord,id@2,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAH____8=\n" +
"operation@AddRecord,id@2,userRecordType@21,length@17,isUpdate@false,compactCount@0,data@AAAABEEAMQAAAAAEQQAxAAA=\n" + "operation@AddRecordTX,txID@2,id@3,userRecordType@21,length@18,isUpdate@false,compactCount@0,data@AAAABEEAMQAAAAAEQQAxAAAA\n" +
"operation@AddRecord,id@20,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAAAAABQ=\n" + "operation@Commit,txID@2,numberOfRecords@1\n" +
"#File,JournalFileImpl: (activemq-bindings-2.bindings id = 2, recordID = 2)"; "operation@AddRecord,id@20,userRecordType@24,length@8,isUpdate@false,compactCount@0,data@AAAAAAAAABQ=\n" +
"#File,JournalFileImpl: (activemq-bindings-2.bindings id = 2, recordID = 2)";
// Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field // Case the format was changed, and the change was agreed, use _testCreateFormat to recreate this field
String journalFile = "#File,JournalFileImpl: (activemq-data-1.amq id = 1, recordID = 1)\n" + String journalFile = "#File,JournalFileImpl: (activemq-data-1.amq id = 1, recordID = 1)\n" +
"operation@AddRecordTX,txID@0,id@4,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP40EAQAAAAEAAAAGawBlAHkABgAAAAA=\n" + "operation@AddRecordTX,txID@0,id@5,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfYEAQAAAAEAAAAGawBlAHkABgAAAAA=\n" +
"operation@UpdateTX,txID@0,id@4,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@UpdateTX,txID@0,id@5,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecordTX,txID@0,id@5,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAE=\n" + "operation@AddRecordTX,txID@0,id@6,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAE=\n" +
"operation@UpdateTX,txID@0,id@5,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@UpdateTX,txID@0,id@6,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecordTX,txID@0,id@6,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAI=\n" + "operation@AddRecordTX,txID@0,id@7,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAI=\n" +
"operation@UpdateTX,txID@0,id@6,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@UpdateTX,txID@0,id@7,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecordTX,txID@0,id@7,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAABwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAM=\n" + "operation@AddRecordTX,txID@0,id@8,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAACAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAM=\n" +
"operation@UpdateTX,txID@0,id@7,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@UpdateTX,txID@0,id@8,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecordTX,txID@0,id@8,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAACAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP5EEAQAAAAEAAAAGawBlAHkABgAAAAQ=\n" + "operation@AddRecordTX,txID@0,id@9,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAACQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2CfoEAQAAAAEAAAAGawBlAHkABgAAAAQ=\n" +
"operation@UpdateTX,txID@0,id@8,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@UpdateTX,txID@0,id@9,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@Commit,txID@0,numberOfRecords@10\n" + "operation@Commit,txID@0,numberOfRecords@10\n" +
"operation@AddRecord,id@12,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP6gEAQAAAAEAAAAGawBlAHkABgAAAAU=\n" + "operation@AddRecord,id@13,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2Cg0EAQAAAAEAAAAGawBlAHkABgAAAAU=\n" +
"operation@Update,id@12,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@Update,id@13,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecord,id@13,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP6oEAQAAAAEAAAAGawBlAHkABgAAAAY=\n" + "operation@AddRecord,id@14,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2Cg8EAQAAAAEAAAAGawBlAHkABgAAAAY=\n" +
"operation@Update,id@13,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@Update,id@14,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecord,id@14,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADgEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP6sEAQAAAAEAAAAGawBlAHkABgAAAAc=\n" + "operation@AddRecord,id@15,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2ChMEAQAAAAEAAAAGawBlAHkABgAAAAc=\n" +
"operation@Update,id@14,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@Update,id@15,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecord,id@15,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAADwEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP60EAQAAAAEAAAAGawBlAHkABgAAAAg=\n" + "operation@AddRecord,id@16,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAAEAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2ChcEAQAAAAEAAAAGawBlAHkABgAAAAg=\n" +
"operation@Update,id@15,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@Update,id@16,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"operation@AddRecord,id@16,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAAEAEAAAAEQQAxAAAA_wAAAAAAAAAAAAABLLxYP64EAQAAAAEAAAAGawBlAHkABgAAAAk=\n" + "operation@AddRecord,id@17,userRecordType@31,length@65,isUpdate@false,compactCount@0,data@AAAAEQAAAE4AAAAAAAAAEQEAAAAEQQAxAAAA_wAAAAAAAAAAAAABSuT2ChoEAQAAAAEAAAAGawBlAHkABgAAAAk=\n" +
"operation@Update,id@16,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAI=\n" + "operation@Update,id@17,userRecordType@32,length@8,isUpdate@true,compactCount@0,data@AAAAAAAAAAM=\n" +
"#File,JournalFileImpl: (activemq-data-2.amq id = 2, recordID = 2)"; "#File,JournalFileImpl: (activemq-data-2.amq id = 2, recordID = 2)";
public void _testCreateFormat() throws Exception @Test
@Ignore // use this to recreate the format above. Notice we can't change the record format between releases
public void testCreateFormat() throws Exception
{ {
ActiveMQServer server = createServer(true); ActiveMQServer server = createServer(true);
server.start(); server.start();
@ -86,7 +90,7 @@ public class ExportFormatTest extends ServiceTestBase
ClientSessionFactory factory = createSessionFactory(locator); ClientSessionFactory factory = createSessionFactory(locator);
ClientSession session = factory.createSession(false, false, false); ClientSession session = factory.createSession(false, false, false);
session.createQueue("A1", "A1"); session.createQueue("A1", "A1", true);
ClientProducer producer = session.createProducer("A1"); ClientProducer producer = session.createProducer("A1");
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)

View File

@ -34,6 +34,7 @@ import javax.jms.XAConnection;
import javax.jms.XASession; import javax.jms.XASession;
import org.apache.activemq.api.jms.JMSFactoryType; import org.apache.activemq.api.jms.JMSFactoryType;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.jms.tests.util.ProxyAssertSupport; import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
import org.junit.Test; import org.junit.Test;
@ -113,6 +114,10 @@ public class SessionTest extends ActiveMQServerTestCase
@Test @Test
public void testCreateNonExistentQueue() throws Exception public void testCreateNonExistentQueue() throws Exception
{ {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateJmsQueues(false);
getJmsServer().getAddressSettingsRepository().addMatch("#", addressSettings);
Connection conn = getConnectionFactory().createConnection(); Connection conn = getConnectionFactory().createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
try try
@ -147,6 +152,10 @@ public class SessionTest extends ActiveMQServerTestCase
@Test @Test
public void testCreateQueueWhileTopicWithSameNameExists() throws Exception public void testCreateQueueWhileTopicWithSameNameExists() throws Exception
{ {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateJmsQueues(false);
getJmsServer().getAddressSettingsRepository().addMatch("#", addressSettings);
Connection conn = getConnectionFactory().createConnection(); Connection conn = getConnectionFactory().createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
try try

View File

@ -81,6 +81,7 @@ public class QueueImplTest extends UnitTestCase
null, null,
false, false,
true, true,
false,
scheduledExecutor, scheduledExecutor,
null, null,
null, null,
@ -158,6 +159,7 @@ public class QueueImplTest extends UnitTestCase
null, null,
false, false,
true, true,
false,
scheduledExecutor, scheduledExecutor,
null, null,
null, null,
@ -273,6 +275,7 @@ public class QueueImplTest extends UnitTestCase
null, null,
false, false,
true, true,
false,
scheduledExecutor, scheduledExecutor,
null, null,
null, null,

View File

@ -25,7 +25,6 @@ import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.filter.Filter; import org.apache.activemq.core.filter.Filter;
import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.server.Consumer; import org.apache.activemq.core.server.Consumer;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.MessageReference; import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue; import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.RoutingContext; import org.apache.activemq.core.server.RoutingContext;
@ -60,7 +59,7 @@ public class FakeQueue implements Queue
} }
@Override @Override
public void setConsumersRefCount(ActiveMQServer server) public void setConsumersRefCount(ReferenceCounter referenceCounter)
{ {
} }
@ -427,6 +426,12 @@ public class FakeQueue implements Queue
return false; return false;
} }
@Override
public boolean isAutoCreated()
{
return false;
}
@Override @Override
public LinkedListIterator<MessageReference> iterator() public LinkedListIterator<MessageReference> iterator()
{ {

View File

@ -94,17 +94,7 @@ public class QueueImplTest extends UnitTestCase
{ {
final SimpleString name = new SimpleString("oobblle"); final SimpleString name = new SimpleString("oobblle");
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getNamedQueue(name);
QueueImplTest.address1,
name,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
Assert.assertEquals(name, queue.getName()); Assert.assertEquals(name, queue.getName());
} }
@ -112,31 +102,11 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testDurable() public void testDurable()
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getNonDurableQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
false,
scheduledExecutor,
null,
null,
null,
executor);
Assert.assertFalse(queue.isDurable()); Assert.assertFalse(queue.isDurable());
queue = new QueueImpl(1, queue = getDurableQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
true,
false,
scheduledExecutor,
null,
null,
null,
executor);
Assert.assertTrue(queue.isDurable()); Assert.assertTrue(queue.isDurable());
} }
@ -150,17 +120,7 @@ public class QueueImplTest extends UnitTestCase
Consumer cons3 = new FakeConsumer(); Consumer cons3 = new FakeConsumer();
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
Assert.assertEquals(0, queue.getConsumerCount()); Assert.assertEquals(0, queue.getConsumerCount());
@ -202,17 +162,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testGetFilter() public void testGetFilter()
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
Assert.assertNull(queue.getFilter()); Assert.assertNull(queue.getFilter());
@ -229,17 +179,7 @@ public class QueueImplTest extends UnitTestCase
} }
}; };
queue = new QueueImpl(1, queue = getFilteredQueue(filter);
QueueImplTest.address1,
QueueImplTest.queue1,
filter,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
Assert.assertEquals(filter, queue.getFilter()); Assert.assertEquals(filter, queue.getFilter());
@ -248,17 +188,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testSimpleadd() public void testSimpleadd()
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
final int numMessages = 10; final int numMessages = 10;
@ -278,17 +208,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testRate() throws InterruptedException public void testRate() throws InterruptedException
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
final int numMessages = 10; final int numMessages = 10;
@ -309,17 +229,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testSimpleNonDirectDelivery() throws Exception public void testSimpleNonDirectDelivery() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
final int numMessages = 10; final int numMessages = 10;
@ -358,17 +268,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testBusyConsumer() throws Exception public void testBusyConsumer() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
FakeConsumer consumer = new FakeConsumer(); FakeConsumer consumer = new FakeConsumer();
@ -413,17 +313,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testBusyConsumerThenAddMoreMessages() throws Exception public void testBusyConsumerThenAddMoreMessages() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
FakeConsumer consumer = new FakeConsumer(); FakeConsumer consumer = new FakeConsumer();
@ -491,17 +381,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testaddHeadadd() throws Exception public void testaddHeadadd() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
final int numMessages = 10; final int numMessages = 10;
@ -556,17 +436,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testChangeConsumersAndDeliver() throws Exception public void testChangeConsumersAndDeliver() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
new FakePostOffice(),
null,
null,
executor);
final int numMessages = 10; final int numMessages = 10;
@ -730,17 +600,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testRoundRobinWithQueueing() throws Exception public void testRoundRobinWithQueueing() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
final int numMessages = 10; final int numMessages = 10;
@ -793,17 +653,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testWithPriorities() throws Exception public void testWithPriorities() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
final int numMessages = 10; final int numMessages = 10;
@ -856,17 +706,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testConsumerWithFilterAddAndRemove() public void testConsumerWithFilterAddAndRemove()
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
Filter filter = new FakeFilter("fruit", "orange"); Filter filter = new FakeFilter("fruit", "orange");
@ -876,17 +716,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testIterator() public void testIterator()
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
final int numMessages = 20; final int numMessages = 20;
@ -925,17 +755,7 @@ public class QueueImplTest extends UnitTestCase
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
new FakePostOffice(),
null,
null,
executor);
Filter filter = new FakeFilter("fruit", "orange"); Filter filter = new FakeFilter("fruit", "orange");
@ -1009,17 +829,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testBusyConsumerWithFilterFirstCallBusy() throws Exception public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'")); FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
@ -1061,17 +871,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'")); FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
@ -1146,17 +946,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testConsumerWithFilterThenAddMoreMessages() throws Exception public void testConsumerWithFilterThenAddMoreMessages() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
final int numMessages = 10; final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>(); List<MessageReference> refs = new ArrayList<MessageReference>();
@ -1220,17 +1010,7 @@ public class QueueImplTest extends UnitTestCase
private void testConsumerWithFilters(final boolean direct) throws Exception private void testConsumerWithFilters(final boolean direct) throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
new FakePostOffice(),
null,
null,
executor);
Filter filter = new FakeFilter("fruit", "orange"); Filter filter = new FakeFilter("fruit", "orange");
@ -1323,17 +1103,7 @@ public class QueueImplTest extends UnitTestCase
public void testMessageOrder() throws Exception public void testMessageOrder() throws Exception
{ {
FakeConsumer consumer = new FakeConsumer(); FakeConsumer consumer = new FakeConsumer();
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3); MessageReference messageReference3 = generateReference(queue, 3);
@ -1354,17 +1124,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testMessagesAdded() throws Exception public void testMessagesAdded() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3); MessageReference messageReference3 = generateReference(queue, 3);
@ -1377,17 +1137,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testGetReference() throws Exception public void testGetReference() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3); MessageReference messageReference3 = generateReference(queue, 3);
@ -1401,17 +1151,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testGetNonExistentReference() throws Exception public void testGetNonExistentReference() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3); MessageReference messageReference3 = generateReference(queue, 3);
@ -1430,17 +1170,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testPauseAndResumeWithAsync() throws Exception public void testPauseAndResumeWithAsync() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
// pauses the queue // pauses the queue
queue.pause(); queue.pause();
@ -1498,17 +1228,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testPauseAndResumeWithDirect() throws Exception public void testPauseAndResumeWithDirect() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
// Now add a consumer // Now add a consumer
FakeConsumer consumer = new FakeConsumer(); FakeConsumer consumer = new FakeConsumer();
@ -1553,17 +1273,7 @@ public class QueueImplTest extends UnitTestCase
@Test @Test
public void testResetMessagesAdded() throws Exception public void testResetMessagesAdded() throws Exception
{ {
QueueImpl queue = new QueueImpl(1, QueueImpl queue = getTemporaryQueue();
QueueImplTest.address1,
QueueImplTest.queue1,
null,
false,
true,
scheduledExecutor,
null,
null,
null,
executor);
MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference2 = generateReference(queue, 2);
queue.addTail(messageReference); queue.addTail(messageReference);
@ -1668,4 +1378,45 @@ public class QueueImplTest extends UnitTestCase
server.stop(); server.stop();
} }
} }
private QueueImpl getNonDurableQueue()
{
return getQueue(QueueImplTest.queue1, false, false, null);
}
private QueueImpl getDurableQueue()
{
return getQueue(QueueImplTest.queue1, true, false, null);
}
private QueueImpl getNamedQueue(SimpleString name)
{
return getQueue(name, false, true, null);
}
private QueueImpl getFilteredQueue(Filter filter)
{
return getQueue(QueueImplTest.queue1, false, true, filter);
}
private QueueImpl getTemporaryQueue()
{
return getQueue(QueueImplTest.queue1, false, true, null);
}
private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter)
{
return new QueueImpl(1,
QueueImplTest.address1,
name,
filter,
durable,
temporary,
false,
scheduledExecutor,
new FakePostOffice(),
null,
null,
executor);
}
} }

View File

@ -49,7 +49,8 @@ public class FakeQueueFactory implements QueueFactory
final Filter filter, final Filter filter,
final PageSubscription subscription, final PageSubscription subscription,
final boolean durable, final boolean durable,
final boolean temporary) final boolean temporary,
final boolean autoCreated)
{ {
return new QueueImpl(persistenceID, return new QueueImpl(persistenceID,
address, address,
@ -58,6 +59,7 @@ public class FakeQueueFactory implements QueueFactory
subscription, subscription,
durable, durable,
temporary, temporary,
autoCreated,
scheduledExecutor, scheduledExecutor,
postOffice, postOffice,
null, null,