From c002cf13b84308549db99c07918bf1075a5b75be Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Tue, 20 Sep 2016 22:39:28 +0200 Subject: [PATCH] ARTEMIS-743 Created QueueConfig that replace and enable additional behaviours on QueueFactory. Added Filter predicate. --- .../management/ActiveMQServerControl.java | 14 +- .../activemq/artemis/core/filter/Filter.java | 10 + .../artemis/core/filter/FilterUtils.java | 36 +++ .../impl/ActiveMQServerControlImpl.java | 10 +- .../artemis/core/server/ActiveMQServer.java | 16 +- .../artemis/core/server/QueueConfig.java | 270 ++++++++++++++++++ .../artemis/core/server/QueueFactory.java | 8 +- .../core/server/impl/ActiveMQServerImpl.java | 124 ++++---- .../server/impl/PostOfficeJournalLoader.java | 48 ++-- .../core/server/impl/QueueFactoryImpl.java | 15 + .../schema/artemis-configuration.xsd | 116 ++++---- .../artemis/core/server/QueueConfigTest.java | 46 +++ .../integration/client/HangConsumerTest.java | 18 +- .../client/InterruptedLargeMessageTest.java | 23 +- .../core/server/impl/QueueConcurrentTest.java | 4 +- .../server/impl/fakes/FakeQueueFactory.java | 9 +- 16 files changed, 597 insertions(+), 170 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index ab78ef9559..bb55d195b0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -408,6 +408,8 @@ public interface ActiveMQServerControl { /** * Create a durable queue. *
+ * If {@code address} is {@code null} it will be defaulted to {@code name}. + *
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * * @param address address to bind the queue to @@ -420,6 +422,8 @@ public interface ActiveMQServerControl { /** * Create a queue. *
+ * If {@code address} is {@code null} it will be defaulted to {@code name}. + *
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * * @param address address to bind the queue to @@ -436,6 +440,8 @@ public interface ActiveMQServerControl { /** * Create a queue. *
+ * If {@code address} is {@code null} it will be defaulted to {@code name}. + *
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * * @param address address to bind the queue to @@ -450,6 +456,8 @@ public interface ActiveMQServerControl { /** * Deploy a durable queue. *
+ * If {@code address} is {@code null} it will be defaulted to {@code name}. + *
* This method will do nothing if the queue with the given name already exists on the server. * * @param address address to bind the queue to @@ -464,6 +472,8 @@ public interface ActiveMQServerControl { /** * Deploy a queue. *
+ * If {@code address} is {@code null} it will be defaulted to {@code name}. + *
* This method will do nothing if the queue with the given name already exists on the server. * * @param address address to bind the queue to @@ -645,7 +655,7 @@ public interface ActiveMQServerControl { /** * Lists all the consumers connected to this server. * The returned String is a JSON string containing details about each consumer, e.g.: - *
+    * 
     * [
     *   {
     *     "queueName": "fa87c64c-0a38-4697-8421-72e34d17429d",
@@ -744,7 +754,7 @@ public interface ActiveMQServerControl {
                            @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
                            @Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
                            @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
-                           @Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics")  boolean autoCreateJmsTopics,
+                           @Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
                            @Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception;
 
    void removeAddressSettings(String addressMatch) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
index 5dd507cab0..41d5e544e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
@@ -21,6 +21,16 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
 
 public interface Filter {
 
+   /**
+    * JMS Topics (which are outside of the scope of the core API) will require a dumb subscription
+    * with a dummy-filter at this current version as a way to keep its existence valid and TCK
+    * tests. That subscription needs an invalid filter, however paging needs to ignore any
+    * subscription with this filter. For that reason, this filter needs to be rejected on paging or
+    * any other component on the system, and just be ignored for any purpose It's declared here as
+    * this filter is considered a global ignore
+    */
+   String GENERIC_IGNORED_FILTER = "__AMQX=-1";
+
    boolean match(ServerMessage message);
 
    SimpleString getFilterString();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java
new file mode 100644
index 0000000000..c5b1a0a7d4
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.filter;
+
+public final class FilterUtils {
+
+   private FilterUtils() {
+
+   }
+
+   /**
+    * Returns {@code true} if {@code filter} is a {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER}.
+    *
+    * @param filter a subscription filter
+    * @return {@code true} if {@code filter} is not {@code null} and is a {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER}
+    */
+   public static boolean isTopicIdentification(final Filter filter) {
+      return filter != null && filter.getFilterString() != null && filter.getFilterString().toString().equals(Filter.GENERIC_IGNORED_FILTER);
+   }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 6aabbe312b..362b74aa44 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -551,7 +551,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
       clearIO();
       try {
-         server.deployQueue(new SimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false);
+         server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false);
       }
       finally {
          blockOnIO();
@@ -569,7 +569,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       clearIO();
       try {
 
-         server.deployQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+         server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false);
       }
       finally {
          blockOnIO();
@@ -582,7 +582,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
       clearIO();
       try {
-         server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false);
+         server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, true, false);
       }
       finally {
          blockOnIO();
@@ -595,7 +595,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
       clearIO();
       try {
-         server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false);
+         server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, durable, false);
       }
       finally {
          blockOnIO();
@@ -616,7 +616,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         server.createQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+         server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false);
       }
       finally {
          blockOnIO();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index ac653359f8..588c17c3c6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -95,10 +95,14 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    NodeManager getNodeManager();
 
-   /** it will release hold a lock for the activation. */
+   /**
+    * it will release hold a lock for the activation.
+    */
    void unlockActivation();
 
-   /** it will hold a lock for the activation. This will prevent the activation from happening. */
+   /**
+    * it will hold a lock for the activation. This will prevent the activation from happening.
+    */
    void lockActivation();
 
    /**
@@ -266,15 +270,17 @@ public interface ActiveMQServer extends ActiveMQComponent {
    boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException;
 
    /**
-    * Creates a shared queue. if non durable it will exist as long as there are consumers.
-    *
+    * Creates a transient queue. A queue that will exist as long as there are consumers.
+    * The queue will be deleted as soon as all the consumers are removed.
+    * 

* Notice: the queue won't be deleted until the first consumer arrives. * * @param address * @param name * @param filterString * @param durable - * @throws Exception + * @throws ActiveMQInvalidTransientQueueUseException if the shared queue already exists with a different {@code address} or {@code filter} + * @throws NullPointerException if {@code address} is {@code null} */ void createSharedQueue(final SimpleString address, final SimpleString name, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java new file mode 100644 index 0000000000..64df0da458 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.filter.FilterUtils; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; + +public final class QueueConfig { + + private final long id; + private final SimpleString address; + private final SimpleString name; + private final Filter filter; + private final PageSubscription pageSubscription; + private final SimpleString user; + private final boolean durable; + private final boolean temporary; + private final boolean autoCreated; + + public static final class Builder { + + private final long id; + private final SimpleString address; + private final SimpleString name; + private Filter filter; + private PagingManager pagingManager; + private SimpleString user; + private boolean durable; + private boolean temporary; + private boolean autoCreated; + + private Builder(final long id, final SimpleString name) { + this(id, name, name); + } + + private Builder(final long id, final SimpleString name, final SimpleString address) { + this.id = id; + this.name = name; + this.address = address; + this.filter = null; + this.pagingManager = null; + this.user = null; + this.durable = true; + this.temporary = false; + this.autoCreated = true; + validateState(); + } + + private static boolean isEmptyOrNull(SimpleString value) { + return (value == null || value.length() == 0); + } + + private void validateState() { + if (isEmptyOrNull(this.name)) { + throw new IllegalStateException("name can't be null!"); + } + if (isEmptyOrNull(this.address)) { + throw new IllegalStateException("address can't be null!"); + } + } + + public Builder filter(final Filter filter) { + this.filter = filter; + return this; + } + + + public Builder pagingManager(final PagingManager pagingManager) { + this.pagingManager = pagingManager; + return this; + } + + public Builder user(final SimpleString user) { + this.user = user; + return this; + } + + public Builder durable(final boolean durable) { + this.durable = durable; + return this; + } + + public Builder temporary(final boolean temporary) { + this.temporary = temporary; + return this; + } + + public Builder autoCreated(final boolean autoCreated) { + this.autoCreated = autoCreated; + return this; + } + + + /** + * Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}. + *
+ * The reference parameters aren't defensively copied from the {@link Builder} to the {@link QueueConfig}. + *
+ * This method creates a new {@link PageSubscription} only if {@link #pagingManager} is not {@code null} and + * if {@link FilterUtils#isTopicIdentification} returns {@code false} on {@link #filter}. + * + * @throws IllegalStateException if the creation of {@link PageSubscription} fails + */ + public QueueConfig build() { + final PageSubscription pageSubscription; + if (pagingManager != null && !FilterUtils.isTopicIdentification(filter)) { + try { + pageSubscription = this.pagingManager.getPageStore(address).getCursorProvider().createSubscription(id, filter, durable); + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + else { + pageSubscription = null; + } + return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated); + } + + } + + /** + * Returns a new {@link Builder} of a durable, not temporary and autoCreated {@link QueueConfig} with the given {@code id} and {@code name}. + *
+ * The {@code address} is defaulted to the {@code name} value. + * The reference parameters aren't defensively copied. + * + * @param id the id of the queue to be created + * @param name the name of the queue to be created + * @throws IllegalStateException if {@code name} is {@code null} or empty + */ + public static Builder builderWith(final long id, final SimpleString name) { + return new QueueConfig.Builder(id, name); + } + + /** + * Returns a new {@link Builder} of a durable, not temporary and autoCreated {@link QueueConfig} with the given {@code id}, {@code name} and {@code address}. + *
+ * The reference parameters aren't defensively copied. + * + * @param id the id of the queue to be created + * @param name the name of the queue to be created + * @param address the address of the queue to be created + * @throws IllegalStateException if {@code name} or {@code address} are {@code null} or empty + */ + public static Builder builderWith(final long id, final SimpleString name, final SimpleString address) { + return new QueueConfig.Builder(id, name, address); + } + + private QueueConfig(final long id, + final SimpleString address, + final SimpleString name, + final Filter filter, + final PageSubscription pageSubscription, + final SimpleString user, + final boolean durable, + final boolean temporary, + final boolean autoCreated) { + this.id = id; + this.address = address; + this.name = name; + this.filter = filter; + this.pageSubscription = pageSubscription; + this.user = user; + this.durable = durable; + this.temporary = temporary; + this.autoCreated = autoCreated; + } + + public long id() { + return id; + } + + public SimpleString address() { + return address; + } + + public SimpleString name() { + return name; + } + + public Filter filter() { + return filter; + } + + public PageSubscription pageSubscription() { + return pageSubscription; + } + + public SimpleString user() { + return user; + } + + public boolean isDurable() { + return durable; + } + + public boolean isTemporary() { + return temporary; + } + + public boolean isAutoCreated() { + return autoCreated; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + QueueConfig that = (QueueConfig) o; + + if (id != that.id) + return false; + if (durable != that.durable) + return false; + if (temporary != that.temporary) + return false; + if (autoCreated != that.autoCreated) + return false; + if (address != null ? !address.equals(that.address) : that.address != null) + return false; + if (name != null ? !name.equals(that.name) : that.name != null) + return false; + if (filter != null ? !filter.equals(that.filter) : that.filter != null) + return false; + if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null) + return false; + return user != null ? user.equals(that.user) : that.user == null; + + } + + @Override + public int hashCode() { + int result = (int) (id ^ (id >>> 32)); + result = 31 * result + (address != null ? address.hashCode() : 0); + result = 31 * result + (name != null ? name.hashCode() : 0); + result = 31 * result + (filter != null ? filter.hashCode() : 0); + result = 31 * result + (pageSubscription != null ? pageSubscription.hashCode() : 0); + result = 31 * result + (user != null ? user.hashCode() : 0); + result = 31 * result + (durable ? 1 : 0); + result = 31 * result + (temporary ? 1 : 0); + result = 31 * result + (autoCreated ? 1 : 0); + return result; + } + + @Override + public String toString() { + return "QueueConfig{" + "id=" + id + ", address=" + address + ", name=" + name + ", filter=" + filter + ", pageSubscription=" + pageSubscription + ", user=" + user + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + '}'; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java index 5e9f9f12ad..64e7a5d210 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java @@ -23,12 +23,18 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice; /** * A QueueFactory - * + *

* Implementations of this class know how to create queues with the correct attribute values * based on default and overrides */ public interface QueueFactory { + Queue createQueueWith(final QueueConfig config); + + /** + * @deprecated Replaced by {@link #createQueueWith} + */ + @Deprecated Queue createQueue(long persistenceID, final SimpleString address, SimpleString name, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 38005ed071..9bf084da30 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -113,6 +113,7 @@ import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.PostQueueCreationCallback; import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.QueueDeleter; import org.apache.activemq.artemis.core.server.QueueFactory; @@ -172,8 +173,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { * subscription with this filter. For that reason, this filter needs to be rejected on paging or * any other component on the system, and just be ignored for any purpose It's declared here as * this filter is considered a global ignore + * + * @deprecated Replaced by {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER} */ - public static final String GENERIC_IGNORED_FILTER = "__AMQX=-1"; + @Deprecated + public static final String GENERIC_IGNORED_FILTER = Filter.GENERIC_IGNORED_FILTER; private HAPolicy haPolicy; @@ -184,22 +188,19 @@ public class ActiveMQServerImpl implements ActiveMQServer { * {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as * {@link #stop(boolean)} worked as intended. */ - STARTING, - /** + STARTING, /** * server is started. {@code server.isStarted()} returns {@code true}, and all assumptions * about it hold. */ - STARTED, - /** + STARTED, /** * stop() was called but has not finished yet. Meant to avoids starting components while * stop() is executing. */ - STOPPING, - /** + STOPPING, /** * Stopped: either stop() has been called and has finished running, or start() has never been * called. */ - STOPPED; + STOPPED } private volatile SERVER_STATE state = SERVER_STATE.STOPPED; @@ -1290,10 +1291,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { SessionCallback callback, OperationContext context, boolean autoCreateJMSQueues) throws Exception { - return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), - xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), - defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null, - pagingManager); + return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, + autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, + connection, storageManager, postOffice, resourceManager, securityStore, managementService, + this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), + callback, context, autoCreateJMSQueues ? jmsQueueCreator : null, pagingManager); } @Override @@ -1370,7 +1372,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (Binding binding : postOffice.getAllBindings().values()) { if (binding.getType() == BindingType.LOCAL_QUEUE) { - total += ((LocalQueueBinding)binding).getQueue().getMessageCount(); + total += ((LocalQueueBinding) binding).getQueue().getMessageCount(); } } @@ -1383,7 +1385,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (Binding binding : postOffice.getAllBindings().values()) { if (binding.getType() == BindingType.LOCAL_QUEUE) { - total += ((LocalQueueBinding)binding).getQueue().getMessagesAdded(); + total += ((LocalQueueBinding) binding).getQueue().getMessagesAdded(); } } @@ -1396,7 +1398,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (Binding binding : postOffice.getAllBindings().values()) { if (binding.getType() == BindingType.LOCAL_QUEUE) { - total += ((LocalQueueBinding)binding).getQueue().getMessagesAcknowledged(); + total += ((LocalQueueBinding) binding).getQueue().getMessagesAcknowledged(); } } @@ -1409,7 +1411,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (Binding binding : postOffice.getAllBindings().values()) { if (binding.getType() == BindingType.LOCAL_QUEUE) { - total += ((LocalQueueBinding)binding).getQueue().getConsumerCount(); + total += ((LocalQueueBinding) binding).getQueue().getConsumerCount(); } } @@ -1461,25 +1463,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated); } - /** - * Creates a transient queue. A queue that will exist as long as there are consumers. - * The queue will be deleted as soon as all the consumers are removed. - *

- * Notice: the queue won't be deleted until the first consumer arrives. - * - * @param address - * @param name - * @param filterString - * @param durable - * @throws Exception - */ @Override public void createSharedQueue(final SimpleString address, final SimpleString name, final SimpleString filterString, final SimpleString user, boolean durable) throws Exception { - Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false); + //force the old contract about address + if (address == null) { + throw new NullPointerException("address can't be null!"); + } + final Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false); if (!queue.getAddress().equals(address)) { throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name); @@ -1490,8 +1484,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } if (logger.isDebugEnabled()) { - logger.debug("Transient Queue " + name + " created on address " + name + - " with filter=" + filterString); + logger.debug("Transient Queue " + name + " created on address " + name + " with filter=" + filterString); } } @@ -1653,7 +1646,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public void callPostQueueDeletionCallbacks(final SimpleString address, final SimpleString queueName) throws Exception { + public void callPostQueueDeletionCallbacks(final SimpleString address, + final SimpleString queueName) throws Exception { for (PostQueueDeletionCallback callback : postQueueDeletionCallbacks) { callback.callback(address, queueName); } @@ -1933,8 +1927,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { storageManager = createStorageManager(); - if (configuration.getClusterConfigurations().size() > 0 && - ActiveMQDefaultConfiguration.getDefaultClusterUser().equals(configuration.getClusterUser()) && ActiveMQDefaultConfiguration.getDefaultClusterPassword().equals(configuration.getClusterPassword())) { + if (configuration.getClusterConfigurations().size() > 0 && ActiveMQDefaultConfiguration.getDefaultClusterUser().equals(configuration.getClusterUser()) && ActiveMQDefaultConfiguration.getDefaultClusterPassword().equals(configuration.getClusterPassword())) { ActiveMQServerLogger.LOGGER.clusterSecurityRisk(); } @@ -1984,7 +1977,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader()); } - return true; } @@ -2066,7 +2058,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - /** This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests. */ + /** + * This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests. + */ public void injectMonitor(FileStoreMonitor storeMonitor) throws Exception { this.fileStoreMonitor = storeMonitor; pagingManager.injectMonitor(storeMonitor); @@ -2109,7 +2103,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { addressCount++; } - long maxMemory = Runtime.getRuntime().maxMemory(); if (totalMaxSizeBytes >= maxMemory && configuration.getGlobalMaxSize() < 0) { ActiveMQServerLogger.LOGGER.potentialOOME(addressCount, totalMaxSizeBytes, maxMemory); @@ -2201,8 +2194,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean ignoreIfExists, final boolean transientQueue, final boolean autoCreated) throws Exception { - QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); - + final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); if (binding != null) { if (ignoreIfExists) { return binding.getQueue(); @@ -2212,38 +2204,37 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - Filter filter = FilterImpl.createFilter(filterString); + final Filter filter = FilterImpl.createFilter(filterString); - long txID = storageManager.generateID(); - long queueID = storageManager.generateID(); + final long txID = storageManager.generateID(); + final long queueID = storageManager.generateID(); - PageSubscription pageSubscription; - - if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER)) { - pageSubscription = null; + final QueueConfig.Builder queueConfigBuilder; + if (address == null) { + queueConfigBuilder = QueueConfig.builderWith(queueID, queueName); } else { - pageSubscription = pagingManager.getPageStore(address).getCursorProvider().createSubscription(queueID, filter, durable); + queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, address); + } - - final Queue queue = queueFactory.createQueue(queueID, address, queueName, filter, pageSubscription, user, durable, temporary, autoCreated); - + final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build(); + final Queue queue = queueFactory.createQueueWith(queueConfig); if (transientQueue) { - queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName)); + queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); } - else if (autoCreated) { - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queueName)); + else if (queue.isAutoCreated()) { + queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName())); } - binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId()); + final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); - if (durable) { - storageManager.addQueueBinding(txID, binding); + if (queue.isDurable()) { + storageManager.addQueueBinding(txID, localQueueBinding); } try { - postOffice.addBinding(binding); - if (durable) { + postOffice.addBinding(localQueueBinding); + if (queue.isDurable()) { storageManager.commitBindings(txID); } } @@ -2252,11 +2243,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (durable) { storageManager.rollbackBindings(txID); } - if (queue != null) { + final PageSubscription pageSubscription = queue.getPageSubscription(); + try { queue.close(); } - if (pageSubscription != null) { - pageSubscription.destroy(); + finally { + if (pageSubscription != null) { + pageSubscription.destroy(); + } } } catch (Throwable ignored) { @@ -2265,10 +2259,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { throw e; } - managementService.registerAddress(address); - managementService.registerQueue(queue, address, storageManager); + managementService.registerAddress(queue.getAddress()); + managementService.registerQueue(queue, queue.getAddress(), storageManager); - callPostQueueCreationCallbacks(queueName); + callPostQueueCreationCallbacks(queue.getName()); return queue; } @@ -2423,6 +2417,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } private final class ActivationThread extends Thread { + final Runnable runnable; ActivationThread(Runnable runnable, String name) { @@ -2444,6 +2439,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } private final class ConfigurationFileReloader implements ReloadCallback { + @Override public void reload(URL uri) throws Exception { Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 0645dcaf19..ff93ffe7b6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -29,12 +29,12 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.filter.FilterUtils; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; -import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.GroupingInfo; @@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.GroupingHandler; @@ -67,12 +68,12 @@ public class PostOfficeJournalLoader implements JournalLoader { protected final PostOffice postOffice; protected final PagingManager pagingManager; - private StorageManager storageManager; + private final StorageManager storageManager; private final QueueFactory queueFactory; protected final NodeManager nodeManager; private final ManagementService managementService; private final GroupingHandler groupingHandler; - private Configuration configuration; + private final Configuration configuration; private Map queues; public PostOfficeJournalLoader(PostOffice postOffice, @@ -113,50 +114,45 @@ public class PostOfficeJournalLoader implements JournalLoader { public void initQueues(Map queueBindingInfosMap, List queueBindingInfos) throws Exception { int duplicateID = 0; - for (QueueBindingInfo queueBindingInfo : queueBindingInfos) { + for (final QueueBindingInfo queueBindingInfo : queueBindingInfos) { queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo); - Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString()); + final Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString()); - boolean isTopicIdentification = filter != null && filter.getFilterString() != null && - filter.getFilterString().toString().equals(ActiveMQServerImpl.GENERIC_IGNORED_FILTER); + final boolean isTopicIdentification = FilterUtils.isTopicIdentification(filter); if (postOffice.getBinding(queueBindingInfo.getQueueName()) != null) { if (isTopicIdentification) { - long tx = storageManager.generateID(); + final long tx = storageManager.generateID(); storageManager.deleteQueueBinding(tx, queueBindingInfo.getId()); storageManager.commitBindings(tx); continue; } else { - - SimpleString newName = queueBindingInfo.getQueueName().concat("-" + (duplicateID++)); + final SimpleString newName = queueBindingInfo.getQueueName().concat("-" + (duplicateID++)); ActiveMQServerLogger.LOGGER.queueDuplicatedRenaming(queueBindingInfo.getQueueName().toString(), newName.toString()); queueBindingInfo.replaceQueueName(newName); } } - - PageSubscription subscription = null; - - if (!isTopicIdentification) { - subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvider().createSubscription(queueBindingInfo.getId(), filter, true); + final QueueConfig.Builder queueConfigBuilder; + if (queueBindingInfo.getAddress() == null) { + queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName()); } - - Queue queue = queueFactory.createQueue(queueBindingInfo.getId(), queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), filter, subscription, queueBindingInfo.getUser(), true, false, queueBindingInfo.isAutoCreated()); - - if (queueBindingInfo.isAutoCreated()) { + else { + queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName(), queueBindingInfo.getAddress()); + } + queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(queueBindingInfo.getUser()).durable(true).temporary(false).autoCreated(queueBindingInfo.isAutoCreated()); + final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); + if (queue.isAutoCreated()) { queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName())); } - Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId()); - - queues.put(queueBindingInfo.getId(), queue); - + final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); + queues.put(queue.getID(), queue); postOffice.addBinding(binding); - - managementService.registerAddress(queueBindingInfo.getAddress()); - managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager); + managementService.registerAddress(queue.getAddress()); + managementService.registerQueue(queue, queue.getAddress(), storageManager); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 280bb13728..d8f772dd6b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -65,6 +66,20 @@ public class QueueFactoryImpl implements QueueFactory { this.postOffice = postOffice; } + @Override + public Queue createQueueWith(final QueueConfig config) { + final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString()); + final Queue queue; + if (addressSettings.isLastValueQueue()) { + queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + } + else { + queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + } + return queue; + } + + @Deprecated @Override public Queue createQueue(final long persistenceID, final SimpleString address, diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 4d4abd71fb..73aa20b0d2 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -38,7 +38,8 @@ minOccurs="0"> - If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the + If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available + on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. @@ -154,7 +155,8 @@ + default="org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec" maxOccurs="1" + minOccurs="0"> Class name and its parameters for the Decoder used to decode the masked password. Ignored if @@ -199,9 +201,9 @@ - - Whether or not to use the broker name in the JMX properties - + + Whether or not to use the broker name in the JMX properties + @@ -246,7 +248,8 @@ - + how often (in ms) to check the configuration file for modifications @@ -432,7 +435,7 @@ - + address for the queue @@ -679,7 +682,8 @@ - Global Max Size before all addresses will enter into their Full Policy configured upon messages being produced. + Global Max Size before all addresses will enter into their Full Policy configured upon messages being + produced. @@ -1233,11 +1237,11 @@ - - - - - + + + + + @@ -1350,7 +1354,7 @@ - + should duplicate detection headers be inserted in forwarded messages? @@ -1360,7 +1364,8 @@ - DEPRECATED: use message-load-balancing-type instead. Select STRICT to mimic forward-when-no-consumers=true + DEPRECATED: use message-load-balancing-type instead. Select STRICT to mimic + forward-when-no-consumers=true and ON_DEMAND to mimic forward-when-no-consumers=false. @@ -1858,7 +1863,8 @@ - + The amount of time to wait for the replica to acknowledge it has received all the necessary data from @@ -1923,11 +1929,13 @@ - DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back occurs + DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back + occurs - + If we have to start as a replicated server this is the amount of time to wait for the replica to @@ -2099,31 +2107,33 @@ - - - - - - The discovery group to use for scale down, if not supplied then the scale-down-connectors or first - invm connector will be used - - - - - - - - - A list of connectors to use for scaling down, if not supplied then the scale-down-discovery-group or - first invm connector will be used - - - - - - - - + + + + + + The discovery group to use for scale down, if not supplied then the scale-down-connectors or + first + invm connector will be used + + + + + + + + + A list of connectors to use for scaling down, if not supplied then the scale-down-discovery-group + or + first invm connector will be used + + + + + + + + @@ -2252,15 +2262,19 @@ - the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and FAIL policies. + the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and + FAIL policies. - + - used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only. Default = -1 (no limit). + used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before + messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only. + Default = -1 (no limit). @@ -2491,11 +2505,11 @@ - + - - + + - - + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java new file mode 100644 index 0000000000..62e396d00a --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.junit.Assert; +import org.junit.Test; + +public class QueueConfigTest { + + @Test + public void addressMustBeDefaultedToName() { + final QueueConfig queueConfig = QueueConfig.builderWith(1L, new SimpleString("queue_name")).build(); + Assert.assertEquals(queueConfig.name(), queueConfig.address()); + } + + @Test(expected = IllegalStateException.class) + public void cannotAllowNullAddress() { + QueueConfig.builderWith(1L, new SimpleString("queue_name"), null); + } + + @Test(expected = IllegalStateException.class) + public void cannotAllowNullNameWithoutAddress() { + QueueConfig.builderWith(1L, null); + } + + @Test(expected = IllegalStateException.class) + public void cannotAllowNullNameWithAddress() { + QueueConfig.builderWith(1L, null, new SimpleString("queue_address")); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 89432d9699..4ddabec63e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -254,6 +255,13 @@ public class HangConsumerTest extends ActiveMQTestBase { super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager); } + @Override + public Queue createQueueWith(final QueueConfig config) { + queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + return queue; + } + + @Deprecated @Override public Queue createQueue(final long persistenceID, final SimpleString address, @@ -535,7 +543,11 @@ public class HangConsumerTest extends ActiveMQTestBase { * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int) */ @Override - public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { + public int sendLargeMessage(MessageReference reference, + ServerMessage message, + ServerConsumer consumer, + long bodySize, + int deliveryCount) { return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount); } @@ -567,9 +579,7 @@ public class HangConsumerTest extends ActiveMQTestBase { class MyActiveMQServer extends ActiveMQServerImpl { - MyActiveMQServer(Configuration configuration, - MBeanServer mbeanServer, - ActiveMQSecurityManager securityManager) { + MyActiveMQServer(Configuration configuration, MBeanServer mbeanServer, ActiveMQSecurityManager securityManager) { super(configuration, mbeanServer, securityManager); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index a4be3ad871..f7b1c41f38 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -33,18 +33,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.StoreConfiguration; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; -import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -54,6 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCon import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -61,7 +58,11 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.junit.Assert; import org.junit.Before; @@ -208,15 +209,12 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { server.stop(); } - - @Test public void testForcedInterruptUsingJMS() throws Exception { ActiveMQServer server = createServer(true, isNetty()); server.start(); - SimpleString jmsAddress = new SimpleString("jms.queue.Test"); server.createQueue(jmsAddress, jmsAddress, null, true, false); @@ -265,7 +263,6 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { server.stop(); } - @Test public void testSendNonPersistentQueue() throws Exception { @@ -540,7 +537,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { } } - class NoPostACKQueueFactory implements QueueFactory { + final class NoPostACKQueueFactory implements QueueFactory { final StorageManager storageManager; @@ -564,6 +561,12 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { this.execFactory = execFactory; } + @Override + public Queue createQueueWith(final QueueConfig config) { + return new NoPostACKQueue(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, execFactory.getExecutor()); + } + + @Deprecated @Override public Queue createQueue(long persistenceID, SimpleString address, diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java index adf97fc011..34433ff4b4 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.tests.unit.UnitTestLogger; @@ -63,7 +64,8 @@ public class QueueConcurrentTest extends ActiveMQTestBase { */ @Test public void testConcurrentAddsDeliver() throws Exception { - QueueImpl queue = (QueueImpl) queueFactory.createQueue(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, null, false, false, false); + + QueueImpl queue = (QueueImpl) queueFactory.createQueueWith(QueueConfig.builderWith(1, new SimpleString("address1"), new SimpleString("queue1")).durable(false).temporary(false).autoCreated(false).build()); FakeConsumer consumer = new FakeConsumer(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java index b507d3e814..06c7e1e6b9 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java @@ -25,11 +25,12 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -public class FakeQueueFactory implements QueueFactory { +public final class FakeQueueFactory implements QueueFactory { private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory()); @@ -37,6 +38,12 @@ public class FakeQueueFactory implements QueueFactory { private PostOffice postOffice; + @Override + public Queue createQueueWith(final QueueConfig config) { + return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, null, null, executor); + } + + @Deprecated @Override public Queue createQueue(final long persistenceID, final SimpleString address,