ARTEMIS-743 Created QueueConfig that replace and enable additional behaviours on QueueFactory.

Added Filter predicate.
This commit is contained in:
Francesco Nigro 2016-09-20 22:39:28 +02:00 committed by Martyn Taylor
parent e790c78583
commit c002cf13b8
16 changed files with 597 additions and 170 deletions

View File

@ -408,6 +408,8 @@ public interface ActiveMQServerControl {
/**
* Create a durable queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
@ -420,6 +422,8 @@ public interface ActiveMQServerControl {
/**
* Create a queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
@ -436,6 +440,8 @@ public interface ActiveMQServerControl {
/**
* Create a queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
@ -450,6 +456,8 @@ public interface ActiveMQServerControl {
/**
* Deploy a durable queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method will do nothing if the queue with the given name already exists on the server.
*
* @param address address to bind the queue to
@ -464,6 +472,8 @@ public interface ActiveMQServerControl {
/**
* Deploy a queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method will do nothing if the queue with the given name already exists on the server.
*
* @param address address to bind the queue to
@ -645,7 +655,7 @@ public interface ActiveMQServerControl {
/**
* Lists all the consumers connected to this server.
* The returned String is a JSON string containing details about each consumer, e.g.:
*<pre>
* <pre>
* [
* {
* "queueName": "fa87c64c-0a38-4697-8421-72e34d17429d",
@ -744,7 +754,7 @@ public interface ActiveMQServerControl {
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
@Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
@Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception;
void removeAddressSettings(String addressMatch) throws Exception;

View File

@ -21,6 +21,16 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
public interface Filter {
/**
* JMS Topics (which are outside of the scope of the core API) will require a dumb subscription
* with a dummy-filter at this current version as a way to keep its existence valid and TCK
* tests. That subscription needs an invalid filter, however paging needs to ignore any
* subscription with this filter. For that reason, this filter needs to be rejected on paging or
* any other component on the system, and just be ignored for any purpose It's declared here as
* this filter is considered a global ignore
*/
String GENERIC_IGNORED_FILTER = "__AMQX=-1";
boolean match(ServerMessage message);
SimpleString getFilterString();

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.filter;
public final class FilterUtils {
private FilterUtils() {
}
/**
* Returns {@code true} if {@code filter} is a {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER}.
*
* @param filter a subscription filter
* @return {@code true} if {@code filter} is not {@code null} and is a {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER}
*/
public static boolean isTopicIdentification(final Filter filter) {
return filter != null && filter.getFilterString() != null && filter.getFilterString().toString().equals(Filter.GENERIC_IGNORED_FILTER);
}
}

View File

@ -551,7 +551,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
server.deployQueue(new SimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false);
server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false);
}
finally {
blockOnIO();
@ -569,7 +569,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
server.deployQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false);
}
finally {
blockOnIO();
@ -582,7 +582,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false);
server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, true, false);
}
finally {
blockOnIO();
@ -595,7 +595,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false);
server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, durable, false);
}
finally {
blockOnIO();
@ -616,7 +616,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
server.createQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false);
}
finally {
blockOnIO();

View File

@ -95,10 +95,14 @@ public interface ActiveMQServer extends ActiveMQComponent {
NodeManager getNodeManager();
/** it will release hold a lock for the activation. */
/**
* it will release hold a lock for the activation.
*/
void unlockActivation();
/** it will hold a lock for the activation. This will prevent the activation from happening. */
/**
* it will hold a lock for the activation. This will prevent the activation from happening.
*/
void lockActivation();
/**
@ -266,15 +270,17 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Creates a shared queue. if non durable it will exist as long as there are consumers.
*
* Creates a transient queue. A queue that will exist as long as there are consumers.
* The queue will be deleted as soon as all the consumers are removed.
* <p>
* Notice: the queue won't be deleted until the first consumer arrives.
*
* @param address
* @param name
* @param filterString
* @param durable
* @throws Exception
* @throws ActiveMQInvalidTransientQueueUseException if the shared queue already exists with a different {@code address} or {@code filter}
* @throws NullPointerException if {@code address} is {@code null}
*/
void createSharedQueue(final SimpleString address,
final SimpleString name,

View File

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.FilterUtils;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
public final class QueueConfig {
private final long id;
private final SimpleString address;
private final SimpleString name;
private final Filter filter;
private final PageSubscription pageSubscription;
private final SimpleString user;
private final boolean durable;
private final boolean temporary;
private final boolean autoCreated;
public static final class Builder {
private final long id;
private final SimpleString address;
private final SimpleString name;
private Filter filter;
private PagingManager pagingManager;
private SimpleString user;
private boolean durable;
private boolean temporary;
private boolean autoCreated;
private Builder(final long id, final SimpleString name) {
this(id, name, name);
}
private Builder(final long id, final SimpleString name, final SimpleString address) {
this.id = id;
this.name = name;
this.address = address;
this.filter = null;
this.pagingManager = null;
this.user = null;
this.durable = true;
this.temporary = false;
this.autoCreated = true;
validateState();
}
private static boolean isEmptyOrNull(SimpleString value) {
return (value == null || value.length() == 0);
}
private void validateState() {
if (isEmptyOrNull(this.name)) {
throw new IllegalStateException("name can't be null!");
}
if (isEmptyOrNull(this.address)) {
throw new IllegalStateException("address can't be null!");
}
}
public Builder filter(final Filter filter) {
this.filter = filter;
return this;
}
public Builder pagingManager(final PagingManager pagingManager) {
this.pagingManager = pagingManager;
return this;
}
public Builder user(final SimpleString user) {
this.user = user;
return this;
}
public Builder durable(final boolean durable) {
this.durable = durable;
return this;
}
public Builder temporary(final boolean temporary) {
this.temporary = temporary;
return this;
}
public Builder autoCreated(final boolean autoCreated) {
this.autoCreated = autoCreated;
return this;
}
/**
* Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}.
* <br>
* The reference parameters aren't defensively copied from the {@link Builder} to the {@link QueueConfig}.
* <br>
* This method creates a new {@link PageSubscription} only if {@link #pagingManager} is not {@code null} and
* if {@link FilterUtils#isTopicIdentification} returns {@code false} on {@link #filter}.
*
* @throws IllegalStateException if the creation of {@link PageSubscription} fails
*/
public QueueConfig build() {
final PageSubscription pageSubscription;
if (pagingManager != null && !FilterUtils.isTopicIdentification(filter)) {
try {
pageSubscription = this.pagingManager.getPageStore(address).getCursorProvider().createSubscription(id, filter, durable);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
else {
pageSubscription = null;
}
return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated);
}
}
/**
* Returns a new {@link Builder} of a durable, not temporary and autoCreated {@link QueueConfig} with the given {@code id} and {@code name}.
* <br>
* The {@code address} is defaulted to the {@code name} value.
* The reference parameters aren't defensively copied.
*
* @param id the id of the queue to be created
* @param name the name of the queue to be created
* @throws IllegalStateException if {@code name} is {@code null} or empty
*/
public static Builder builderWith(final long id, final SimpleString name) {
return new QueueConfig.Builder(id, name);
}
/**
* Returns a new {@link Builder} of a durable, not temporary and autoCreated {@link QueueConfig} with the given {@code id}, {@code name} and {@code address}.
* <br>
* The reference parameters aren't defensively copied.
*
* @param id the id of the queue to be created
* @param name the name of the queue to be created
* @param address the address of the queue to be created
* @throws IllegalStateException if {@code name} or {@code address} are {@code null} or empty
*/
public static Builder builderWith(final long id, final SimpleString name, final SimpleString address) {
return new QueueConfig.Builder(id, name, address);
}
private QueueConfig(final long id,
final SimpleString address,
final SimpleString name,
final Filter filter,
final PageSubscription pageSubscription,
final SimpleString user,
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
this.id = id;
this.address = address;
this.name = name;
this.filter = filter;
this.pageSubscription = pageSubscription;
this.user = user;
this.durable = durable;
this.temporary = temporary;
this.autoCreated = autoCreated;
}
public long id() {
return id;
}
public SimpleString address() {
return address;
}
public SimpleString name() {
return name;
}
public Filter filter() {
return filter;
}
public PageSubscription pageSubscription() {
return pageSubscription;
}
public SimpleString user() {
return user;
}
public boolean isDurable() {
return durable;
}
public boolean isTemporary() {
return temporary;
}
public boolean isAutoCreated() {
return autoCreated;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
QueueConfig that = (QueueConfig) o;
if (id != that.id)
return false;
if (durable != that.durable)
return false;
if (temporary != that.temporary)
return false;
if (autoCreated != that.autoCreated)
return false;
if (address != null ? !address.equals(that.address) : that.address != null)
return false;
if (name != null ? !name.equals(that.name) : that.name != null)
return false;
if (filter != null ? !filter.equals(that.filter) : that.filter != null)
return false;
if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null)
return false;
return user != null ? user.equals(that.user) : that.user == null;
}
@Override
public int hashCode() {
int result = (int) (id ^ (id >>> 32));
result = 31 * result + (address != null ? address.hashCode() : 0);
result = 31 * result + (name != null ? name.hashCode() : 0);
result = 31 * result + (filter != null ? filter.hashCode() : 0);
result = 31 * result + (pageSubscription != null ? pageSubscription.hashCode() : 0);
result = 31 * result + (user != null ? user.hashCode() : 0);
result = 31 * result + (durable ? 1 : 0);
result = 31 * result + (temporary ? 1 : 0);
result = 31 * result + (autoCreated ? 1 : 0);
return result;
}
@Override
public String toString() {
return "QueueConfig{" + "id=" + id + ", address=" + address + ", name=" + name + ", filter=" + filter + ", pageSubscription=" + pageSubscription + ", user=" + user + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + '}';
}
}

View File

@ -23,12 +23,18 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
/**
* A QueueFactory
*
* <p>
* Implementations of this class know how to create queues with the correct attribute values
* based on default and overrides
*/
public interface QueueFactory {
Queue createQueueWith(final QueueConfig config);
/**
* @deprecated Replaced by {@link #createQueueWith}
*/
@Deprecated
Queue createQueue(long persistenceID,
final SimpleString address,
SimpleString name,

View File

@ -113,6 +113,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueDeleter;
import org.apache.activemq.artemis.core.server.QueueFactory;
@ -172,8 +173,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* subscription with this filter. For that reason, this filter needs to be rejected on paging or
* any other component on the system, and just be ignored for any purpose It's declared here as
* this filter is considered a global ignore
*
* @deprecated Replaced by {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER}
*/
public static final String GENERIC_IGNORED_FILTER = "__AMQX=-1";
@Deprecated
public static final String GENERIC_IGNORED_FILTER = Filter.GENERIC_IGNORED_FILTER;
private HAPolicy haPolicy;
@ -184,22 +188,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as
* {@link #stop(boolean)} worked as intended.
*/
STARTING,
/**
STARTING, /**
* server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
* about it hold.
*/
STARTED,
/**
STARTED, /**
* stop() was called but has not finished yet. Meant to avoids starting components while
* stop() is executing.
*/
STOPPING,
/**
STOPPING, /**
* Stopped: either stop() has been called and has finished running, or start() has never been
* called.
*/
STOPPED;
STOPPED
}
private volatile SERVER_STATE state = SERVER_STATE.STOPPED;
@ -1290,10 +1291,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SessionCallback callback,
OperationContext context,
boolean autoCreateJMSQueues) throws Exception {
return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(),
xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(),
defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null,
pagingManager);
return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends,
autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa,
connection, storageManager, postOffice, resourceManager, securityStore, managementService,
this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress),
callback, context, autoCreateJMSQueues ? jmsQueueCreator : null, pagingManager);
}
@Override
@ -1370,7 +1372,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding)binding).getQueue().getMessageCount();
total += ((LocalQueueBinding) binding).getQueue().getMessageCount();
}
}
@ -1383,7 +1385,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding)binding).getQueue().getMessagesAdded();
total += ((LocalQueueBinding) binding).getQueue().getMessagesAdded();
}
}
@ -1396,7 +1398,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding)binding).getQueue().getMessagesAcknowledged();
total += ((LocalQueueBinding) binding).getQueue().getMessagesAcknowledged();
}
}
@ -1409,7 +1411,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
total += ((LocalQueueBinding)binding).getQueue().getConsumerCount();
total += ((LocalQueueBinding) binding).getQueue().getConsumerCount();
}
}
@ -1461,25 +1463,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated);
}
/**
* Creates a transient queue. A queue that will exist as long as there are consumers.
* The queue will be deleted as soon as all the consumers are removed.
* <p>
* Notice: the queue won't be deleted until the first consumer arrives.
*
* @param address
* @param name
* @param filterString
* @param durable
* @throws Exception
*/
@Override
public void createSharedQueue(final SimpleString address,
final SimpleString name,
final SimpleString filterString,
final SimpleString user,
boolean durable) throws Exception {
Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false);
//force the old contract about address
if (address == null) {
throw new NullPointerException("address can't be null!");
}
final Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false);
if (!queue.getAddress().equals(address)) {
throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);
@ -1490,8 +1484,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
if (logger.isDebugEnabled()) {
logger.debug("Transient Queue " + name + " created on address " + name +
" with filter=" + filterString);
logger.debug("Transient Queue " + name + " created on address " + name + " with filter=" + filterString);
}
}
@ -1653,7 +1646,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public void callPostQueueDeletionCallbacks(final SimpleString address, final SimpleString queueName) throws Exception {
public void callPostQueueDeletionCallbacks(final SimpleString address,
final SimpleString queueName) throws Exception {
for (PostQueueDeletionCallback callback : postQueueDeletionCallbacks) {
callback.callback(address, queueName);
}
@ -1933,8 +1927,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
storageManager = createStorageManager();
if (configuration.getClusterConfigurations().size() > 0 &&
ActiveMQDefaultConfiguration.getDefaultClusterUser().equals(configuration.getClusterUser()) && ActiveMQDefaultConfiguration.getDefaultClusterPassword().equals(configuration.getClusterPassword())) {
if (configuration.getClusterConfigurations().size() > 0 && ActiveMQDefaultConfiguration.getDefaultClusterUser().equals(configuration.getClusterUser()) && ActiveMQDefaultConfiguration.getDefaultClusterPassword().equals(configuration.getClusterPassword())) {
ActiveMQServerLogger.LOGGER.clusterSecurityRisk();
}
@ -1984,7 +1977,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader());
}
return true;
}
@ -2066,7 +2058,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
/** This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests. */
/**
* This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests.
*/
public void injectMonitor(FileStoreMonitor storeMonitor) throws Exception {
this.fileStoreMonitor = storeMonitor;
pagingManager.injectMonitor(storeMonitor);
@ -2109,7 +2103,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
addressCount++;
}
long maxMemory = Runtime.getRuntime().maxMemory();
if (totalMaxSizeBytes >= maxMemory && configuration.getGlobalMaxSize() < 0) {
ActiveMQServerLogger.LOGGER.potentialOOME(addressCount, totalMaxSizeBytes, maxMemory);
@ -2201,8 +2194,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean ignoreIfExists,
final boolean transientQueue,
final boolean autoCreated) throws Exception {
QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
if (ignoreIfExists) {
return binding.getQueue();
@ -2212,38 +2204,37 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
Filter filter = FilterImpl.createFilter(filterString);
final Filter filter = FilterImpl.createFilter(filterString);
long txID = storageManager.generateID();
long queueID = storageManager.generateID();
final long txID = storageManager.generateID();
final long queueID = storageManager.generateID();
PageSubscription pageSubscription;
if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER)) {
pageSubscription = null;
final QueueConfig.Builder queueConfigBuilder;
if (address == null) {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName);
}
else {
pageSubscription = pagingManager.getPageStore(address).getCursorProvider().createSubscription(queueID, filter, durable);
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, address);
}
final Queue queue = queueFactory.createQueue(queueID, address, queueName, filter, pageSubscription, user, durable, temporary, autoCreated);
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
final Queue queue = queueFactory.createQueueWith(queueConfig);
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName));
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
}
else if (autoCreated) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queueName));
else if (queue.isAutoCreated()) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName()));
}
binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId());
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
if (durable) {
storageManager.addQueueBinding(txID, binding);
if (queue.isDurable()) {
storageManager.addQueueBinding(txID, localQueueBinding);
}
try {
postOffice.addBinding(binding);
if (durable) {
postOffice.addBinding(localQueueBinding);
if (queue.isDurable()) {
storageManager.commitBindings(txID);
}
}
@ -2252,11 +2243,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (durable) {
storageManager.rollbackBindings(txID);
}
if (queue != null) {
final PageSubscription pageSubscription = queue.getPageSubscription();
try {
queue.close();
}
if (pageSubscription != null) {
pageSubscription.destroy();
finally {
if (pageSubscription != null) {
pageSubscription.destroy();
}
}
}
catch (Throwable ignored) {
@ -2265,10 +2259,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw e;
}
managementService.registerAddress(address);
managementService.registerQueue(queue, address, storageManager);
managementService.registerAddress(queue.getAddress());
managementService.registerQueue(queue, queue.getAddress(), storageManager);
callPostQueueCreationCallbacks(queueName);
callPostQueueCreationCallbacks(queue.getName());
return queue;
}
@ -2423,6 +2417,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
private final class ActivationThread extends Thread {
final Runnable runnable;
ActivationThread(Runnable runnable, String name) {
@ -2444,6 +2439,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
private final class ConfigurationFileReloader implements ReloadCallback {
@Override
public void reload(URL uri) throws Exception {
Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream());

View File

@ -29,12 +29,12 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.FilterUtils;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
@ -67,12 +68,12 @@ public class PostOfficeJournalLoader implements JournalLoader {
protected final PostOffice postOffice;
protected final PagingManager pagingManager;
private StorageManager storageManager;
private final StorageManager storageManager;
private final QueueFactory queueFactory;
protected final NodeManager nodeManager;
private final ManagementService managementService;
private final GroupingHandler groupingHandler;
private Configuration configuration;
private final Configuration configuration;
private Map<Long, Queue> queues;
public PostOfficeJournalLoader(PostOffice postOffice,
@ -113,50 +114,45 @@ public class PostOfficeJournalLoader implements JournalLoader {
public void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap,
List<QueueBindingInfo> queueBindingInfos) throws Exception {
int duplicateID = 0;
for (QueueBindingInfo queueBindingInfo : queueBindingInfos) {
for (final QueueBindingInfo queueBindingInfo : queueBindingInfos) {
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
final Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
boolean isTopicIdentification = filter != null && filter.getFilterString() != null &&
filter.getFilterString().toString().equals(ActiveMQServerImpl.GENERIC_IGNORED_FILTER);
final boolean isTopicIdentification = FilterUtils.isTopicIdentification(filter);
if (postOffice.getBinding(queueBindingInfo.getQueueName()) != null) {
if (isTopicIdentification) {
long tx = storageManager.generateID();
final long tx = storageManager.generateID();
storageManager.deleteQueueBinding(tx, queueBindingInfo.getId());
storageManager.commitBindings(tx);
continue;
}
else {
SimpleString newName = queueBindingInfo.getQueueName().concat("-" + (duplicateID++));
final SimpleString newName = queueBindingInfo.getQueueName().concat("-" + (duplicateID++));
ActiveMQServerLogger.LOGGER.queueDuplicatedRenaming(queueBindingInfo.getQueueName().toString(), newName.toString());
queueBindingInfo.replaceQueueName(newName);
}
}
PageSubscription subscription = null;
if (!isTopicIdentification) {
subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvider().createSubscription(queueBindingInfo.getId(), filter, true);
final QueueConfig.Builder queueConfigBuilder;
if (queueBindingInfo.getAddress() == null) {
queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName());
}
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(), queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), filter, subscription, queueBindingInfo.getUser(), true, false, queueBindingInfo.isAutoCreated());
if (queueBindingInfo.isAutoCreated()) {
else {
queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName(), queueBindingInfo.getAddress());
}
queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(queueBindingInfo.getUser()).durable(true).temporary(false).autoCreated(queueBindingInfo.isAutoCreated());
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
if (queue.isAutoCreated()) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
}
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
queues.put(queueBindingInfo.getId(), queue);
final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
queues.put(queue.getID(), queue);
postOffice.addBinding(binding);
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
managementService.registerAddress(queue.getAddress());
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -65,6 +66,20 @@ public class QueueFactoryImpl implements QueueFactory {
this.postOffice = postOffice;
}
@Override
public Queue createQueueWith(final QueueConfig config) {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
final Queue queue;
if (addressSettings.isLastValueQueue()) {
queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
}
else {
queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
}
return queue;
}
@Deprecated
@Override
public Queue createQueue(final long persistenceID,
final SimpleString address,

View File

@ -38,7 +38,8 @@
minOccurs="0">
<xsd:annotation>
<xsd:documentation>
If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the
If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available
on the
classpath. If false then only the core protocol will be available, unless in Embedded mode where users
can inject their own Protocol Managers.
</xsd:documentation>
@ -154,7 +155,8 @@
</xsd:element>
<xsd:element name="password-codec" type="xsd:string"
default="org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec" maxOccurs="1" minOccurs="0">
default="org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec" maxOccurs="1"
minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Class name and its parameters for the Decoder used to decode the masked password. Ignored if
@ -199,9 +201,9 @@
<xsd:element name="jmx-use-broker-name" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Whether or not to use the broker name in the JMX properties
</xsd:documentation>
<xsd:documentation>
Whether or not to use the broker name in the JMX properties
</xsd:documentation>
</xsd:annotation>
</xsd:element>
@ -246,7 +248,8 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="configuration-file-refresh-period" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
<xsd:element name="configuration-file-refresh-period" type="xsd:long" default="5000" maxOccurs="1"
minOccurs="0">
<xsd:annotation>
<xsd:documentation>
how often (in ms) to check the configuration file for modifications
@ -432,7 +435,7 @@
<xsd:element name="queue" maxOccurs="unbounded" minOccurs="0">
<xsd:complexType>
<xsd:all>
<xsd:element name="address" type="xsd:string" maxOccurs="1" minOccurs="1">
<xsd:element name="address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
address for the queue
@ -679,7 +682,8 @@
<xsd:element name="global-max-size" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Global Max Size before all addresses will enter into their Full Policy configured upon messages being produced.
Global Max Size before all addresses will enter into their Full Policy configured upon messages being
produced.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
@ -1233,11 +1237,11 @@
<xsd:complexType name="clusterConnectionChoiceType">
<xsd:sequence>
<xsd:choice maxOccurs="unbounded">
<xsd:element name="cluster-connection-uri" type="cluster-connectionUriType"/>
<xsd:element name="cluster-connection" type="cluster-connectionType">
</xsd:element>
</xsd:choice>
<xsd:choice maxOccurs="unbounded">
<xsd:element name="cluster-connection-uri" type="cluster-connectionUriType"/>
<xsd:element name="cluster-connection" type="cluster-connectionType">
</xsd:element>
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
@ -1350,7 +1354,7 @@
</xsd:element>
<xsd:element name="use-duplicate-detection" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation >
<xsd:annotation>
<xsd:documentation>
should duplicate detection headers be inserted in forwarded messages?
</xsd:documentation>
@ -1360,7 +1364,8 @@
<xsd:element name="forward-when-no-consumers" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
DEPRECATED: use message-load-balancing-type instead. Select STRICT to mimic forward-when-no-consumers=true
DEPRECATED: use message-load-balancing-type instead. Select STRICT to mimic
forward-when-no-consumers=true
and ON_DEMAND to mimic forward-when-no-consumers=false.
</xsd:documentation>
</xsd:annotation>
@ -1858,7 +1863,8 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0">
<xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1"
minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The amount of time to wait for the replica to acknowledge it has received all the necessary data from
@ -1923,11 +1929,13 @@
<xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back occurs
DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back
occurs
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0">
<xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1"
minOccurs="0">
<xsd:annotation>
<xsd:documentation>
If we have to start as a replicated server this is the amount of time to wait for the replica to
@ -2099,31 +2107,33 @@
</xsd:annotation>
</xsd:element>
<xsd:choice>
<xsd:element name="discovery-group-ref" maxOccurs="1" minOccurs="0">
<xsd:complexType>
<xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required">
<xsd:annotation>
<xsd:documentation>
The discovery group to use for scale down, if not supplied then the scale-down-connectors or first
invm connector will be used
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="connectors" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
A list of connectors to use for scaling down, if not supplied then the scale-down-discovery-group or
first invm connector will be used
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element name="connector-ref" type="xsd:string" maxOccurs="unbounded" minOccurs="1"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:element name="discovery-group-ref" maxOccurs="1" minOccurs="0">
<xsd:complexType>
<xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required">
<xsd:annotation>
<xsd:documentation>
The discovery group to use for scale down, if not supplied then the scale-down-connectors or
first
invm connector will be used
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="connectors" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
A list of connectors to use for scaling down, if not supplied then the scale-down-discovery-group
or
first invm connector will be used
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element name="connector-ref" type="xsd:string" maxOccurs="unbounded" minOccurs="1"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
@ -2252,15 +2262,19 @@
<xsd:element name="max-size-bytes" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and FAIL policies.
the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and
FAIL policies.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1"
minOccurs="0">
<xsd:annotation>
<xsd:documentation>
used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only. Default = -1 (no limit).
used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before
messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only.
Default = -1 (no limit).
</xsd:documentation>
</xsd:annotation>
</xsd:element>
@ -2491,11 +2505,11 @@
</xsd:complexType>
<xsd:complexType name="transportType">
<xsd:simpleContent>
<xsd:simpleContent>
<xsd:extension base="xsd:string">
<xsd:attribute name="name" type="xsd:string">
</xsd:attribute>
<xsd:attribute name="name" type="xsd:string">
</xsd:attribute>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
</xsd:simpleContent>
</xsd:complexType>
</xsd:schema>

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.junit.Assert;
import org.junit.Test;
public class QueueConfigTest {
@Test
public void addressMustBeDefaultedToName() {
final QueueConfig queueConfig = QueueConfig.builderWith(1L, new SimpleString("queue_name")).build();
Assert.assertEquals(queueConfig.name(), queueConfig.address());
}
@Test(expected = IllegalStateException.class)
public void cannotAllowNullAddress() {
QueueConfig.builderWith(1L, new SimpleString("queue_name"), null);
}
@Test(expected = IllegalStateException.class)
public void cannotAllowNullNameWithoutAddress() {
QueueConfig.builderWith(1L, null);
}
@Test(expected = IllegalStateException.class)
public void cannotAllowNullNameWithAddress() {
QueueConfig.builderWith(1L, null, new SimpleString("queue_address"));
}
}

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -254,6 +255,13 @@ public class HangConsumerTest extends ActiveMQTestBase {
super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager);
}
@Override
public Queue createQueueWith(final QueueConfig config) {
queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
return queue;
}
@Deprecated
@Override
public Queue createQueue(final long persistenceID,
final SimpleString address,
@ -535,7 +543,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/
@Override
public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
public int sendLargeMessage(MessageReference reference,
ServerMessage message,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
}
@ -567,9 +579,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
class MyActiveMQServer extends ActiveMQServerImpl {
MyActiveMQServer(Configuration configuration,
MBeanServer mbeanServer,
ActiveMQSecurityManager securityManager) {
MyActiveMQServer(Configuration configuration, MBeanServer mbeanServer, ActiveMQSecurityManager securityManager) {
super(configuration, mbeanServer, securityManager);
}

View File

@ -33,18 +33,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -54,6 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCon
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -61,7 +58,11 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.junit.Assert;
import org.junit.Before;
@ -208,15 +209,12 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
server.stop();
}
@Test
public void testForcedInterruptUsingJMS() throws Exception {
ActiveMQServer server = createServer(true, isNetty());
server.start();
SimpleString jmsAddress = new SimpleString("jms.queue.Test");
server.createQueue(jmsAddress, jmsAddress, null, true, false);
@ -265,7 +263,6 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
server.stop();
}
@Test
public void testSendNonPersistentQueue() throws Exception {
@ -540,7 +537,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
}
}
class NoPostACKQueueFactory implements QueueFactory {
final class NoPostACKQueueFactory implements QueueFactory {
final StorageManager storageManager;
@ -564,6 +561,12 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
this.execFactory = execFactory;
}
@Override
public Queue createQueueWith(final QueueConfig config) {
return new NoPostACKQueue(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, execFactory.getExecutor());
}
@Deprecated
@Override
public Queue createQueue(long persistenceID,
SimpleString address,

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
@ -63,7 +64,8 @@ public class QueueConcurrentTest extends ActiveMQTestBase {
*/
@Test
public void testConcurrentAddsDeliver() throws Exception {
QueueImpl queue = (QueueImpl) queueFactory.createQueue(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, null, false, false, false);
QueueImpl queue = (QueueImpl) queueFactory.createQueueWith(QueueConfig.builderWith(1, new SimpleString("address1"), new SimpleString("queue1")).durable(false).temporary(false).autoCreated(false).build());
FakeConsumer consumer = new FakeConsumer();

View File

@ -25,11 +25,12 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
public class FakeQueueFactory implements QueueFactory {
public final class FakeQueueFactory implements QueueFactory {
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@ -37,6 +38,12 @@ public class FakeQueueFactory implements QueueFactory {
private PostOffice postOffice;
@Override
public Queue createQueueWith(final QueueConfig config) {
return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, null, null, executor);
}
@Deprecated
@Override
public Queue createQueue(final long persistenceID,
final SimpleString address,