From 7dfa0fe7f43012f2f4ee5715049878019fe0bef7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Thu, 10 Jan 2019 20:09:02 +0000 Subject: [PATCH] ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement. Add OpenWire JMS Test - taken from ActiveMQ5 Add Core JMS Test Add AMQP Test Add Docs --- .../artemis/api/core/QueueAttributes.java | 13 + .../activemq/artemis/core/PriorityAware.java | 22 ++ .../collections/ArrayResettableIterator.java | 69 ++++ .../utils/collections/MultiIterator.java | 31 ++ .../utils/collections/MultiIteratorBase.java | 79 +++++ .../collections/MultiResettableIterator.java | 43 +++ .../utils/collections/PriorityCollection.java | 324 ++++++++++++++++++ .../utils/collections/RepeatableIterator.java | 27 ++ .../RepeatableIteratorWrapper.java | 66 ++++ .../utils/collections/ResettableIterator.java | 27 ++ .../utils/collections/SingletonIterator.java | 61 ++++ .../utils/collections/UpdatableIterator.java | 92 +++++ .../utils/collections/MultiIteratorTest.java | 75 ++++ .../MultiResettableIteratorTest.java | 127 +++++++ .../collections/PriorityCollectionTest.java | 296 ++++++++++++++++ .../collections/UpdatableIteratorTest.java | 95 +++++ .../config/ActiveMQDefaultConfiguration.java | 6 + .../api/core/client/ClientSession.java | 53 +++ .../core/client/impl/ClientConsumerImpl.java | 10 + .../client/impl/ClientConsumerInternal.java | 2 + .../core/client/impl/ClientSessionImpl.java | 23 +- .../protocol/core/CoreRemotingConnection.java | 5 + .../core/impl/ActiveMQSessionContext.java | 7 +- .../protocol/core/impl/PacketDecoder.java | 4 +- .../core/protocol/core/impl/PacketImpl.java | 10 +- .../SessionCreateConsumerMessage.java | 30 +- .../spi/core/remoting/SessionContext.java | 1 + .../artemis/jms/client/ActiveMQSession.java | 16 +- .../amqp/broker/AMQPSessionCallback.java | 13 +- .../client/HornetQClientSessionContext.java | 5 +- .../protocol/openwire/amq/AMQConsumer.java | 5 +- .../core/ServerSessionPacketHandler.java | 6 +- .../artemis/core/server/Consumer.java | 9 +- .../artemis/core/server/ServerSession.java | 10 +- .../core/server/impl/QueueConsumers.java | 37 ++ .../core/server/impl/QueueConsumersImpl.java | 127 +++++++ .../artemis/core/server/impl/QueueImpl.java | 281 +++++++-------- .../core/server/impl/ServerConsumerImpl.java | 28 ++ .../core/server/impl/ServerSessionImpl.java | 14 +- .../server/impl/QueueConsumersImplTest.java | 136 ++++++++ docs/user-manual/en/SUMMARY.md | 1 + docs/user-manual/en/consumer-priority.md | 45 +++ .../transport/amqp/client/AmqpReceiver.java | 13 +- .../transport/amqp/client/AmqpSession.java | 13 +- .../amqp/AmqpReceiverPriorityTest.java | 134 ++++++++ .../jms/client/ConsumerPriorityTest.java | 297 ++++++++++++++++ .../amq/QueueConsumerPriorityTest.java | 65 ++++ .../jms/tests/message/MessageHeaderTest.java | 10 + .../client/impl/LargeMessageBufferTest.java | 5 + 49 files changed, 2681 insertions(+), 187 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIteratorBase.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIterator.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIteratorWrapper.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/UpdatableIterator.java create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiIteratorTest.java create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiResettableIteratorTest.java create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/PriorityCollectionTest.java create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/UpdatableIteratorTest.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java create mode 100644 docs/user-manual/en/consumer-priority.md create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java index 12c5f86321..0f7eb3c13b 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java @@ -32,6 +32,7 @@ public class QueueAttributes implements Serializable { public static final String PURGE_ON_NO_CONSUMERS = "purge-on-no-consumers"; public static final String CONSUMERS_BEFORE_DISPATCH = "consumers-before-dispatch"; public static final String DELAY_BEFORE_DISPATCH = "delay-before-dispatch"; + public static final String CONSUMER_PRIORITY = "consumer-priority"; private RoutingType routingType; private SimpleString filterString; @@ -44,6 +45,7 @@ public class QueueAttributes implements Serializable { private Boolean purgeOnNoConsumers; private Integer consumersBeforeDispatch; private Long delayBeforeDispatch; + private Integer consumerPriority; public void set(String key, String value) { if (key != null && value != null) { @@ -69,6 +71,8 @@ public class QueueAttributes implements Serializable { setConsumersBeforeDispatch(Integer.valueOf(value)); } else if (key.equals(DELAY_BEFORE_DISPATCH)) { setDelayBeforeDispatch(Long.valueOf(value)); + } else if (key.equals(CONSUMER_PRIORITY)) { + setConsumerPriority(Integer.valueOf(value)); } } } @@ -172,4 +176,13 @@ public class QueueAttributes implements Serializable { return this; } + public Integer getConsumerPriority() { + return consumerPriority; + } + + public QueueAttributes setConsumerPriority(Integer consumerPriority) { + this.consumerPriority = consumerPriority; + return this; + } + } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java new file mode 100644 index 0000000000..e90015a1c1 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core; + +public interface PriorityAware { + + int getPriority(); +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java new file mode 100644 index 0000000000..c0e248d9d0 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Collection; + +/** + * Provides an Array Iterator that is able to reset, allowing you to iterate over the full array. + * It achieves this though by moving end position mark to the the current cursors position, + * so it round robins, even with reset. + * @param + */ +public class ArrayResettableIterator implements ResettableIterator { + + private final Object[] array; + private int cursor = 0; + private int endPos = -1; + private boolean hasNext; + + public ArrayResettableIterator(Object[] array) { + this.array = array; + reset(); + } + + public static ResettableIterator iterator(Collection collection) { + return new ArrayResettableIterator<>(collection.toArray()); + } + + @Override + public void reset() { + endPos = cursor; + hasNext = array.length > 0; + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public T next() { + if (!hasNext) { + throw new IllegalStateException(); + } + @SuppressWarnings("unchecked") T result = (T) array[cursor]; + cursor++; + if (cursor == array.length) { + cursor = 0; + } + if (cursor == endPos) { + hasNext = false; + } + return result; + } +} \ No newline at end of file diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java new file mode 100644 index 0000000000..738420c7b0 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + */ +public class MultiIterator extends MultiIteratorBase> { + + public MultiIterator(Iterator[] iterators) { + super(iterators); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIteratorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIteratorBase.java new file mode 100644 index 0000000000..ab866a1318 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIteratorBase.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Abstract Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + * @param type of the iterator + */ +abstract class MultiIteratorBase> implements Iterator { + + private final I[] iterators; + private int index = -1; + + MultiIteratorBase(I[] iterators) { + this.iterators = iterators; + } + + @Override + public boolean hasNext() { + while (true) { + if (index != -1) { + I currentIterator = get(index); + if (currentIterator.hasNext()) { + return true; + } + } + int next = index + 1; + if (next < iterators.length) { + moveTo(next); + } else { + return false; + } + } + } + + @Override + public T next() { + while (true) { + if (index != -1) { + I currentIterator = get(index); + if (currentIterator.hasNext()) { + return currentIterator.next(); + } + } + int next = index + 1; + if (next < iterators.length) { + moveTo(next); + } else { + return null; + } + } + } + + protected void moveTo(int index) { + this.index = index; + } + + protected I get(int index) { + return iterators[index]; + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java new file mode 100644 index 0000000000..dcea7daa3b --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +/** + * Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset. + * It achieves this by going back to the first iterator, and as moves to another iterator it resets it. + * + * @param type of the class of the iterator. + */ +public class MultiResettableIterator extends MultiIteratorBase> implements ResettableIterator { + + public MultiResettableIterator(ResettableIterator[] iterators) { + super(iterators); + } + + @Override + protected void moveTo(int index) { + super.moveTo(index); + if (index > -1) { + get(index).reset(); + } + } + + @Override + public void reset() { + moveTo(-1); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java new file mode 100644 index 0000000000..fb96f0b1f5 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = ArrayResettableIterator.iterator(snapshot[i].getValues()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + return (ResettableIterator[]) Array.newInstance(ResettableIterator.class, length); + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + PriorityHolder[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].getValues().forEach(action); + } + } + + private Collection getCollection(int priority, boolean createIfMissing) { + PriorityHolder[] current = getArray(); + int low = 0; + int high = current.length - 1; + + while (low <= high) { + int mid = (low + high) / 2; + PriorityHolder midVal = current[mid]; + + if (midVal.getPriority() > priority) + low = mid + 1; + else if (midVal.getPriority() < priority) + high = mid - 1; + else + return midVal.getValues(); //key found + } + + if (createIfMissing) { + PriorityHolder[] newArray = newPrioritySetArrayInstance(current.length + 1); + if (low > 0) { + System.arraycopy(current, 0, newArray, 0, low); + } + if (current.length - low > 0) { + System.arraycopy(current, low, newArray, low + 1, current.length - low); + } + newArray[low] = new PriorityHolder(priority, supplier); + setArray(newArray); + return newArray[low].getValues(); + } + return null; + } + + @Override + public synchronized boolean add(T t) { + if (size() == Integer.MAX_VALUE) return false; + boolean result = addInternal(t); + calcSize(); + return result; + } + + private boolean addInternal(T t) { + if (t == null) return false; + Collection priority = getCollection(t.getPriority(), true); + return priority.add(t); + } + + @Override + public synchronized boolean remove(Object o) { + boolean result = removeInternal(o); + calcSize(); + return result; + } + + private boolean removeInternal(Object o) { + if (o instanceof PriorityAware) { + PriorityAware priorityAware = (PriorityAware) o; + Collection priority = getCollection(priorityAware.getPriority(), false); + boolean result = priority != null && priority.remove(priorityAware); + if (priority != null && priority.isEmpty()) { + removeCollection(priorityAware.getPriority()); + } + return result; + } else { + return false; + } + } + + private Collection removeCollection(int priority) { + PriorityHolder[] current = getArray(); + int len = current.length; + int low = 0; + int high = len - 1; + + while (low <= high) { + int mid = (low + high) / 2; + PriorityHolder midVal = current[mid]; + + if (midVal.getPriority() > priority) + low = mid + 1; + else if (midVal.getPriority() < priority) + high = mid - 1; + else { + PriorityHolder[] newArray = newPrioritySetArrayInstance(len - 1); + System.arraycopy(current, 0, newArray, 0, mid); + System.arraycopy(current, mid + 1, newArray, mid, len - mid - 1); + setArray(newArray); + return midVal.getValues(); //key found + } + } + return null; + } + + @Override + public boolean containsAll(Collection c) { + Objects.requireNonNull(c); + for (Object e : c) + if (!contains(e)) + return false; + return true; + } + + @Override + public synchronized boolean addAll(Collection c) { + Objects.requireNonNull(c); + if (size() >= Integer.MAX_VALUE - c.size()) return false; + boolean modified = false; + for (T e : c) + if (addInternal(e)) + modified = true; + calcSize(); + return modified; + } + + @Override + public synchronized boolean removeAll(Collection c) { + Objects.requireNonNull(c); + boolean modified = false; + for (Object o : c) { + if (removeInternal(o)) { + modified = true; + } + } + calcSize(); + return modified; + } + + @Override + public synchronized boolean retainAll(Collection c) { + Objects.requireNonNull(c); + boolean modified = false; + PriorityHolder[] snapshot = getArray(); + for (PriorityHolder priorityHolder : snapshot) { + if (priorityHolder.getValues().retainAll(c)) { + modified = true; + if (priorityHolder.getValues().isEmpty()) { + removeCollection(priorityHolder.getPriority()); + } + } + } + calcSize(); + return modified; + } + + @Override + public synchronized void clear() { + PriorityHolder[] snapshot = getArray(); + for (PriorityHolder priorityHolder : snapshot) { + priorityHolder.getValues().clear(); + } + calcSize(); + } + + @Override + public boolean contains(Object o) { + return o instanceof PriorityAware && contains((PriorityAware) o); + } + + public boolean contains(PriorityAware priorityAware) { + if (priorityAware == null) return false; + Collection prioritySet = getCollection(priorityAware.getPriority(), false); + return prioritySet != null && prioritySet.contains(priorityAware); + } + + private void calcSize() { + PriorityHolder[] current = getArray(); + int size = 0; + for (PriorityHolder priorityHolder : current) { + size += priorityHolder.getValues().size(); + } + this.size = size; + } + + public static class PriorityHolder implements PriorityAware { + + private final int priority; + + private final Collection values; + + public PriorityHolder(int priority, Supplier> supplier) { + this.priority = priority; + this.values = supplier.get(); + } + + @Override + public int getPriority() { + return priority; + } + + public Collection getValues() { + return values; + } + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIterator.java new file mode 100644 index 0000000000..cb41eea7c8 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIterator.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +public interface RepeatableIterator extends Iterator { + + /** + * If the current value should repeat. + */ + void repeat(); +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIteratorWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIteratorWrapper.java new file mode 100644 index 0000000000..2c23cba53a --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIteratorWrapper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.function.Consumer; + +public class RepeatableIteratorWrapper implements RepeatableIterator, ResettableIterator { + + private ResettableIterator iterator; + private E last; + private boolean repeat; + + public RepeatableIteratorWrapper(ResettableIterator iterator) { + this.iterator = iterator; + } + + @Override + public void repeat() { + if (last != null) { + repeat = true; + } + } + + @Override + public boolean hasNext() { + return repeat || iterator.hasNext(); + } + + @Override + public E next() { + if (repeat) { + repeat = false; + return last; + } + return last = iterator.next(); + } + + @Override + public void remove() { + iterator.remove(); + } + + @Override + public void forEachRemaining(Consumer action) { + iterator.forEachRemaining(action); + } + + @Override + public void reset() { + iterator.reset(); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java new file mode 100644 index 0000000000..f807eb196c --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +public interface ResettableIterator extends Iterator { + + /** + * Resets the iterator so you can re-iterate over all elements. + */ + void reset(); +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java new file mode 100644 index 0000000000..956e045e23 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.function.Consumer; + +public class SingletonIterator implements Iterator { + + private E value; + + public static Iterator newInstance(E e) { + return new SingletonIterator<>(e); + } + + private SingletonIterator(E value) { + this.value = value; + } + + @Override + public boolean hasNext() { + return value != null; + } + + @Override + public E next() { + if (value != null) { + E result = value; + value = null; + return result; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + value = null; + } + + @Override + public void forEachRemaining(Consumer action) { + if (value != null) + action.accept(value); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/UpdatableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/UpdatableIterator.java new file mode 100644 index 0000000000..3ed82b4d7b --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/UpdatableIterator.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +public class UpdatableIterator implements ResettableIterator, RepeatableIterator { + + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(UpdatableIterator.class, RepeatableIteratorWrapper.class, "changedIterator"); + private volatile RepeatableIteratorWrapper changedIterator; + private RepeatableIteratorWrapper currentIterator; + + public UpdatableIterator(ResettableIterator iterator) { + this.currentIterator = new RepeatableIteratorWrapper<>(iterator); + } + + /** + * This can be called by another thread. + * It sets a new iterator, that will be picked up on the next reset. + * + * @param iterator the new iterator to update to. + */ + public void update(ResettableIterator iterator) { + changedIteratorFieldUpdater.set(this, new RepeatableIteratorWrapper<>(iterator)); + } + + /* + * ---- ResettableIterator Methods ----- + * All the below ResettableIterator (including reset) methods MUST be called by the same thread, + * this is as any other use of Iterator. + */ + + /** + * When reset is called, then if a new iterator has been provided by another thread via update method, + * then we switch over to using the new iterator. + * + * It is important that on nulling off the changedIterator, we atomically compare and set as the + * changedIterator could be further updated by another thread whilst we are resetting, + * the subsequent update simply would be picked up on the next reset. + * + * @return this (itself). + */ + @Override + public void reset() { + RepeatableIteratorWrapper changedIterator = this.changedIterator; + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } + currentIterator.reset(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public E next() { + return currentIterator.next(); + } + + @Override + public void remove() { + currentIterator.remove(); + } + + @Override + public void forEachRemaining(Consumer action) { + currentIterator.forEachRemaining(action); + } + + @Override + public void repeat() { + currentIterator.repeat(); + } +} diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiIteratorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiIteratorTest.java new file mode 100644 index 0000000000..9947a5e53b --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiIteratorTest.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MultiIteratorTest { + + @Test + public void testSingleIterator() { + + ArrayList arrayList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + arrayList.add(i); + } + + MultiIterator iterator = new MultiIterator<>(new Iterator[]{arrayList.iterator()}); + for (int i = 0; i < 1000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + + } + + @Test + public void testMutlipleIterators() { + + ArrayList arrayList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + arrayList.add(i); + } + ArrayList arrayList2 = new ArrayList<>(); + for (int i = 1000; i < 2000; i++) { + arrayList2.add(i); + } + ArrayList arrayList3 = new ArrayList<>(); + for (int i = 2000; i < 3000; i++) { + arrayList3.add(i); + } + + MultiIterator iterator = new MultiIterator<>(new Iterator[]{arrayList.iterator(), arrayList2.iterator(), arrayList3.iterator()}); + for (int i = 0; i < 3000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + + } +} \ No newline at end of file diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiResettableIteratorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiResettableIteratorTest.java new file mode 100644 index 0000000000..211679a822 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiResettableIteratorTest.java @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class MultiResettableIteratorTest { + + @Test + public void testSingleIterator() { + + ArrayList arrayList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + arrayList.add(i); + } + + MultiResettableIterator iterator = new MultiResettableIterator<>(new ResettableIterator[]{ArrayResettableIterator.iterator(arrayList)}); + for (int i = 0; i < 1000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + + iterator.reset(); + + for (int i = 0; i < 1000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + } + + @Test + public void testMutlipleIterators() { + + ArrayList arrayList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + arrayList.add(i); + } + ArrayList arrayList2 = new ArrayList<>(); + for (int i = 1000; i < 2000; i++) { + arrayList2.add(i); + } + ArrayList arrayList3 = new ArrayList<>(); + for (int i = 2000; i < 3000; i++) { + arrayList3.add(i); + } + + MultiResettableIterator iterator = new MultiResettableIterator<>(new ResettableIterator[]{ArrayResettableIterator.iterator(arrayList), ArrayResettableIterator.iterator(arrayList2), ArrayResettableIterator.iterator(arrayList3)}); + for (int i = 0; i < 3000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + + + //Reset and ensure we re-iterate all. + iterator.reset(); + + for (int i = 0; i < 3000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + + + } + + @Test + public void testMutlipleIteratorsResetMidIteration() { + + ArrayList arrayList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + arrayList.add(i); + } + ArrayList arrayList2 = new ArrayList<>(); + for (int i = 1000; i < 2000; i++) { + arrayList2.add(i); + } + ArrayList arrayList3 = new ArrayList<>(); + for (int i = 2000; i < 3000; i++) { + arrayList3.add(i); + } + + MultiResettableIterator iterator = new MultiResettableIterator<>(new ResettableIterator[]{ArrayResettableIterator.iterator(arrayList), ArrayResettableIterator.iterator(arrayList2), ArrayResettableIterator.iterator(arrayList3)}); + for (int i = 0; i < 100; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + + //Reset and ensure we re-iterate all. + iterator.reset(); + + for (int i = 0; i < 3000; i++) { + assertTrue(iterator.hasNext()); + assertNotNull(iterator.next()); + } + assertFalse(iterator.hasNext()); + + + } +} \ No newline at end of file diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/PriorityCollectionTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/PriorityCollectionTest.java new file mode 100644 index 0000000000..97b1a6d34d --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/PriorityCollectionTest.java @@ -0,0 +1,296 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class PriorityCollectionTest { + + @Test + public void simpleInsertions() { + PriorityCollection set = new PriorityCollection<>(CopyOnWriteArraySet::new); + + assertTrue(set.isEmpty()); + assertTrue(set.add(new TestPriorityAware(1))); + assertFalse(set.isEmpty()); + + assertTrue(set.add(new TestPriorityAware(2))); + assertTrue(set.add(new TestPriorityAware(3))); + + assertEquals(set.size(), 3); + + assertTrue(set.contains(new TestPriorityAware(1))); + assertEquals(set.size(), 3); + + assertTrue(set.remove(new TestPriorityAware(1))); + assertEquals(set.size(), 2); + assertFalse(set.contains(new TestPriorityAware(1))); + assertFalse(set.contains(new TestPriorityAware(5))); + assertEquals(set.size(), 2); + + assertTrue(set.add(new TestPriorityAware(1))); + assertEquals(set.size(), 3); + assertFalse(set.add(new TestPriorityAware(1))); + assertEquals(set.size(), 3); + } + + @Test + public void testRemove() { + PriorityCollection set = new PriorityCollection<>(CopyOnWriteArraySet::new); + + assertTrue(set.isEmpty()); + assertTrue(set.add(new TestPriorityAware(1))); + assertFalse(set.isEmpty()); + + assertFalse(set.remove(new TestPriorityAware(0))); + assertFalse(set.isEmpty()); + assertTrue(set.remove(new TestPriorityAware(1))); + assertTrue(set.isEmpty()); + } + + @Test + public void concurrentInsertions() throws Throwable { + PriorityCollection set = new PriorityCollection<>(CopyOnWriteArraySet::new); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 1000; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = Math.abs(random.nextLong()); + // Ensure keys are unique + key -= key % (threadIdx + 1); + + set.add(new TestPriorityAware(key)); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(set.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void concurrentInsertionsAndReads() throws Throwable { + PriorityCollection set = new PriorityCollection<>(CopyOnWriteArraySet::new); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 1000; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = Math.abs(random.nextLong()); + // Ensure keys are unique + key -= key % (threadIdx + 1); + + set.add(new TestPriorityAware(key)); + } + })); + + futures.add(executor.submit(() -> { + Iterator iterator = set.resettableIterator(); + while (iterator.hasNext()) { + iterator.next(); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(set.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void testIteration() { + PriorityCollection set = new PriorityCollection<>(CopyOnWriteArraySet::new); + + assertFalse(set.iterator().hasNext()); + + set.add(new TestPriorityAware(0)); + + assertTrue(set.iterator().hasNext()); + + set.remove(new TestPriorityAware(0)); + + assertFalse(set.iterator().hasNext()); + + set.add(new TestPriorityAware(0)); + set.add(new TestPriorityAware(1)); + set.add(new TestPriorityAware(2)); + + List values = new ArrayList<>(set); + + assertTrue(values.contains(new TestPriorityAware(0))); + assertTrue(values.contains(new TestPriorityAware(1))); + assertTrue(values.contains(new TestPriorityAware(2))); + + set.clear(); + assertTrue(set.isEmpty()); + } + + @Test + public void priorityTest() { + PriorityCollection set = new PriorityCollection<>(CopyOnWriteArraySet::new); + set.add(new TestPriority("A", 127)); + set.add(new TestPriority("B", 127)); + set.add(new TestPriority("E", 0)); + set.add(new TestPriority("D", 20)); + set.add(new TestPriority("C", 127)); + + ResettableIterator iterator = set.resettableIterator(); + iterator.reset(); + assertTrue(iterator.hasNext()); + + assertEquals("A", iterator.next().getName()); + + //Reset iterator should mark start as current position + iterator.reset(); + assertTrue(iterator.hasNext()); + assertEquals("B", iterator.next().getName()); + + assertTrue(iterator.hasNext()); + assertEquals("C", iterator.next().getName()); + + //Expect another A as after reset, we started at B so after A we then expect the next level + assertTrue(iterator.hasNext()); + assertEquals("A", iterator.next().getName()); + + assertTrue(iterator.hasNext()); + assertEquals("D", iterator.next().getName()); + + assertTrue(iterator.hasNext()); + assertEquals("E", iterator.next().getName()); + + //We have iterated all. + assertFalse(iterator.hasNext()); + + //Reset to iterate again. + iterator.reset(); + + //We expect the iteration to round robin from last returned at the level. + assertTrue(iterator.hasNext()); + assertEquals("B", iterator.next().getName()); + + + } + + + private class TestPriorityAware implements PriorityAware { + + private long value; + + private TestPriorityAware(long value) { + this.value = value; + } + + @Override + public int getPriority() { + //10 priority levels + return (int) value % 10; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestPriorityAware that = (TestPriorityAware) o; + return value == that.value; + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + } + + private class TestPriority implements PriorityAware { + + private String name; + private int priority; + + private TestPriority(String name, int priority) { + this.name = name; + this.priority = priority; + } + + @Override + public int getPriority() { + return priority; + } + + public String getName() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestPriority that = (TestPriority) o; + return priority == that.priority && + Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, priority); + } + } + +} \ No newline at end of file diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/UpdatableIteratorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/UpdatableIteratorTest.java new file mode 100644 index 0000000000..633672afa6 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/UpdatableIteratorTest.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class UpdatableIteratorTest { + + @Test + public void testUnderlyingIterator() { + + ArrayList arrayList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + arrayList.add(i); + } + + UpdatableIterator iterator = new UpdatableIterator(ArrayResettableIterator.iterator(arrayList)); + for (int i = 0; i < 1000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + + iterator.reset(); + + for (int i = 0; i < 1000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + } + + @Test + public void testUpdateIterator() { + + ArrayList arrayList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + arrayList.add(i); + } + + ArrayList arrayList2 = new ArrayList<>(); + for (int i = 4000; i < 5000; i++) { + arrayList2.add(i); + } + + UpdatableIterator iterator = new UpdatableIterator(ArrayResettableIterator.iterator(arrayList)); + for (int i = 0; i < 100; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + + //Update the iterator + iterator.update(ArrayResettableIterator.iterator(arrayList2)); + + //Ensure the current iterator in use is not updated until reset, and we iterate remaining. + for (int i = 100; i < 1000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + + //Reset the iterator, we now expect to act on the updated iterator. + iterator.reset(); + + for (int i = 4000; i < 5000; i++) { + assertTrue(iterator.hasNext()); + assertEquals(Integer.valueOf(i), iterator.next()); + } + assertFalse(iterator.hasNext()); + } +} \ No newline at end of file diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 8760c9c2ad..7d20cfde8c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -473,6 +473,8 @@ public final class ActiveMQDefaultConfiguration { public static final int DEFAULT_MAX_QUEUE_CONSUMERS = -1; + public static final int DEFAULT_CONSUMER_PRIORITY = 0; + public static final boolean DEFAULT_EXCLUSIVE = false; public static final boolean DEFAULT_LAST_VALUE = false; @@ -1320,6 +1322,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_MAX_QUEUE_CONSUMERS; } + public static int getDefaultConsumerPriority() { + return DEFAULT_CONSUMER_PRIORITY; + } + public static boolean getDefaultExclusive() { return DEFAULT_EXCLUSIVE; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index b400c911fd..14ca75cc59 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -865,6 +865,30 @@ public interface ClientSession extends XAResource, AutoCloseable { SimpleString filter, boolean browseOnly) throws ActiveMQException; + /** + * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with + * the given name. + *

+ * If browseOnly is true, the ClientConsumer will receive the messages + * from the queue but they will not be consumed (the messages will remain in the queue). Note + * that paged messages will not be in the queue, and will therefore not be visible if + * {@code browseOnly} is {@code true}. + *

+ * If browseOnly is false, the ClientConsumer will behave like consume + * the messages from the queue and the messages will effectively be removed from the queue. + * + * @param queueName name of the queue to consume messages from + * @param filter only messages which match this filter will be consumed + * @param priority the consumer priority + * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages. + * @return a ClientConsumer + * @throws ActiveMQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(SimpleString queueName, + SimpleString filter, + int priority, + boolean browseOnly) throws ActiveMQException; + /** * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with * the given name. @@ -891,6 +915,35 @@ public interface ClientSession extends XAResource, AutoCloseable { int maxRate, boolean browseOnly) throws ActiveMQException; + + /** + * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with + * the given name. + *

+ * If browseOnly is true, the ClientConsumer will receive the messages + * from the queue but they will not be consumed (the messages will remain in the queue). Note + * that paged messages will not be in the queue, and will therefore not be visible if + * {@code browseOnly} is {@code true}. + *

+ * If browseOnly is false, the ClientConsumer will behave like consume + * the messages from the queue and the messages will effectively be removed from the queue. + * + * @param queueName name of the queue to consume messages from + * @param filter only messages which match this filter will be consumed + * @param priority the consumer priority + * @param windowSize the consumer window size + * @param maxRate the maximum rate to consume messages + * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages. + * @return a ClientConsumer + * @throws ActiveMQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(SimpleString queueName, + SimpleString filter, + int priority, + int windowSize, + int maxRate, + boolean browseOnly) throws ActiveMQException; + /** * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with * the given name. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index 4946efb960..7f9ede3cf6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -70,6 +70,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { private final SimpleString filterString; + private final int priority; + private final SimpleString queueName; private final boolean browseOnly; @@ -141,6 +143,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { final ConsumerContext consumerContext, final SimpleString queueName, final SimpleString filterString, + final int priority, final boolean browseOnly, final int initialWindow, final int clientWindowSize, @@ -157,6 +160,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { this.filterString = filterString; + this.priority = priority; + this.browseOnly = browseOnly; this.sessionContext = sessionContext; @@ -562,6 +567,11 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { return filterString; } + @Override + public int getPriority() { + return priority; + } + @Override public SimpleString getQueueName() { return queueName; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java index 819082134b..55d30e7e6b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java @@ -29,6 +29,8 @@ public interface ClientConsumerInternal extends ClientConsumer { SimpleString getFilterString(); + int getPriority(); + boolean isBrowseOnly(); void handleMessage(ClientMessageInternal message) throws Exception; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 5efaed682f..e8e39e74a6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -798,6 +798,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, browseOnly); } + @Override + public ClientConsumer createConsumer(final SimpleString queueName, + final SimpleString filterString, + final int priority, + final boolean browseOnly) throws ActiveMQException { + return createConsumer(queueName, filterString, priority, consumerWindowSize, consumerMaxRate, browseOnly); + } + @Override public ClientConsumer createConsumer(final SimpleString queueName, final boolean browseOnly) throws ActiveMQException { @@ -821,6 +829,15 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return sessionContext.isWritable(callback); } + @Override + public ClientConsumer createConsumer(final SimpleString queueName, + final SimpleString filterString, + final int windowSize, + final int maxRate, + final boolean browseOnly) throws ActiveMQException { + return createConsumer(queueName, filterString, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), windowSize, maxRate, browseOnly); + } + /** * Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on * the remoting thread). @@ -834,10 +851,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi @Override public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, + final int priority, final int windowSize, final int maxRate, final boolean browseOnly) throws ActiveMQException { - return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly); + return internalCreateConsumer(queueName, filterString, priority, windowSize, maxRate, browseOnly); } @Override @@ -1931,12 +1949,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi */ private ClientConsumer internalCreateConsumer(final SimpleString queueName, final SimpleString filterString, + final int priority, final int windowSize, final int maxRate, final boolean browseOnly) throws ActiveMQException { checkClosed(); - ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor); + ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor); addConsumer(consumer); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index 74d9847ffb..fac76e4dcc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -41,6 +41,11 @@ public interface CoreRemotingConnection extends RemotingConnection { return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION); } + default boolean isVersionSupportConsumerPriority() { + int version = getChannelVersion(); + return version >= PacketImpl.CONSUMER_PRIORITY_CHANGE_VERSION; + } + /** * Sets the client protocol used on the communication. This will determine if the client has * support for certain packet types diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index ccf10ab1ce..bfe8ec0287 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -367,6 +367,7 @@ public class ActiveMQSessionContext extends SessionContext { @Override public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, + int priority, int windowSize, int maxRate, int ackBatchSize, @@ -377,7 +378,7 @@ public class ActiveMQSessionContext extends SessionContext { ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); - SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); + SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, priority, browseOnly, true); SessionQueueQueryResponseMessage queueInfo; @@ -392,7 +393,7 @@ public class ActiveMQSessionContext extends SessionContext { // The value we send is just a hint final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize; - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); } @Override @@ -875,7 +876,7 @@ public class ActiveMQSessionContext extends SessionContext { sendPacketWithoutLock(sessionChannel, createQueueRequest); } - SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.isBrowseOnly(), false); + SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.getPriority(), consumerInternal.isBrowseOnly(), false); sendPacketWithoutLock(sessionChannel, createConsumerRequest); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 9a8166e31c..f576feaf5e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage; @@ -39,8 +40,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage; @@ -91,7 +92,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 0168a47c66..62bf423dfe 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -33,7 +33,9 @@ public class PacketImpl implements Packet { public static final int ADDRESSING_CHANGE_VERSION = 129; // 2.7.0 - public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130; + public static final int ARTEMIS_2_7_0_VERSION = 130; + public static final int ASYNC_RESPONSE_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION; + public static final int CONSUMER_PRIORITY_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION; public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); @@ -318,7 +320,7 @@ public class PacketImpl implements Packet { encodeHeader(buffer); - encodeRest(buffer); + encodeRest(buffer, connection); encodeSize(buffer); @@ -394,6 +396,10 @@ public class PacketImpl implements Packet { public void encodeRest(final ActiveMQBuffer buffer) { } + public void encodeRest(final ActiveMQBuffer buffer, final CoreRemotingConnection coreRemotingConnection) { + encodeRest(buffer); + } + public void decodeRest(final ActiveMQBuffer buffer) { } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java index e07b50ca2f..8db65b8d60 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java @@ -16,8 +16,10 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; public class SessionCreateConsumerMessage extends QueueAbstractPacket { @@ -25,6 +27,8 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { private SimpleString filterString; + private int priority; + private boolean browseOnly; private boolean requiresResponse; @@ -32,6 +36,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { public SessionCreateConsumerMessage(final long id, final SimpleString queueName, final SimpleString filterString, + final int priority, final boolean browseOnly, final boolean requiresResponse) { super(SESS_CREATECONSUMER); @@ -39,6 +44,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { this.id = id; this.queueName = queueName; this.filterString = filterString; + this.priority = priority; this.browseOnly = browseOnly; this.requiresResponse = requiresResponse; } @@ -55,6 +61,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { buff.append(", id=" + id); buff.append(", browseOnly=" + browseOnly); buff.append(", requiresResponse=" + requiresResponse); + buff.append(", priority=" + priority); buff.append("]"); return buff.toString(); } @@ -67,6 +74,10 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { return filterString; } + public int getPriority() { + return priority; + } + public boolean isBrowseOnly() { return browseOnly; } @@ -84,17 +95,25 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { this.filterString = filterString; } + public void setPriority(byte priority) { + this.priority = priority; + } + public void setBrowseOnly(boolean browseOnly) { this.browseOnly = browseOnly; } @Override - public void encodeRest(final ActiveMQBuffer buffer) { + public void encodeRest(final ActiveMQBuffer buffer, final CoreRemotingConnection coreRemotingConnection) { buffer.writeLong(id); buffer.writeSimpleString(queueName); buffer.writeNullableSimpleString(filterString); buffer.writeBoolean(browseOnly); buffer.writeBoolean(requiresResponse); + //Priority Support added in 2.7.0 + if (coreRemotingConnection.isVersionSupportConsumerPriority()) { + buffer.writeInt(priority); + } } @Override @@ -104,6 +123,12 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { filterString = buffer.readNullableSimpleString(); browseOnly = buffer.readBoolean(); requiresResponse = buffer.readBoolean(); + //Priority Support Added in 2.7.0 + if (buffer.readableBytes() > 0) { + priority = buffer.readInt(); + } else { + priority = ActiveMQDefaultConfiguration.getDefaultConsumerPriority(); + } } @Override @@ -113,6 +138,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { result = prime * result + (browseOnly ? 1231 : 1237); result = prime * result + ((filterString == null) ? 0 : filterString.hashCode()); result = prime * result + (int) (id ^ (id >>> 32)); + result = prime * result + priority; result = prime * result + ((queueName == null) ? 0 : queueName.hashCode()); result = prime * result + (requiresResponse ? 1231 : 1237); return result; @@ -134,6 +160,8 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { return false; } else if (!filterString.equals(other.filterString)) return false; + if (priority != other.priority) + return false; if (id != other.id) return false; if (queueName == null) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index e25441b012..93352049be 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -314,6 +314,7 @@ public abstract class SessionContext { public abstract ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, + int priority, int windowSize, int maxRate, int ackBatchSize, diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 693ab84ead..60f5d8ba09 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -51,6 +51,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.QueueAttributes; @@ -710,7 +711,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { } } - consumer = session.createConsumer(queueName, null, false); + consumer = createClientConsumer(dest, queueName, null); ActiveMQMessageConsumer jbc = new ActiveMQMessageConsumer(options, connection, this, consumer, false, dest, selectorString, autoDeleteQueueName); @@ -779,7 +780,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { connection.addKnownDestination(dest.getSimpleAddress()); - consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, false); + consumer = createClientConsumer(dest, null, coreFilterString); } else { AddressQuery response = session.addressQuery(dest.getSimpleAddress()); @@ -804,8 +805,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response); - consumer = session.createConsumer(queueName, null, false); - + consumer = createClientConsumer(dest, queueName, null); autoDeleteQueueName = queueName; } else { // Durable sub @@ -860,7 +860,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { } } - consumer = session.createConsumer(queueName, null, false); + consumer = createClientConsumer(dest, queueName, null); } } @@ -874,6 +874,12 @@ public class ActiveMQSession implements QueueSession, TopicSession { } } + private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException { + QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes(); + int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority(); + return session.createConsumer(queueName == null ? destination.getSimpleAddress() : queueName, coreFilterString, priority, false); + } + public void ackAllConsumers() throws JMSException { checkClosed(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index c3643873a6..f850cc185e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -16,8 +16,10 @@ */ package org.apache.activemq.artemis.protocol.amqp.broker; +import java.util.Map; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; @@ -80,6 +82,8 @@ public class AMQPSessionCallback implements SessionCallback { private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class); + private static final Symbol PRIORITY = Symbol.getSymbol("priority"); + protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0); private final AMQPConnectionCallback protonSPI; @@ -199,7 +203,9 @@ public class AMQPSessionCallback implements SessionCallback { filter = SelectorTranslator.convertToActiveMQFilterString(filter); - ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null); + int priority = getPriority(protonSender.getSender().getRemoteProperties()); + + ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), priority, browserOnly, false, null); // AMQP handles its own flow control for when it's started consumer.setStarted(true); @@ -209,6 +215,11 @@ public class AMQPSessionCallback implements SessionCallback { return consumer; } + private int getPriority(Map properties) { + Number value = properties == null ? null : (Number) properties.get(PRIORITY); + return value == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : value.intValue(); + } + public void startSender(Object brokerConsumer) throws Exception { ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer; // flow control is done at proton diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index 235d699293..4ae4968193 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -78,6 +78,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { @Override public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, + int priority, int windowSize, int maxRate, int ackBatchSize, @@ -88,7 +89,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); - SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); + SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, priority, browseOnly, true); SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); @@ -96,7 +97,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { // could be overridden on the queue settings // The value we send is just a hint - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 45d9fa1ea1..c35bc644ce 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -137,11 +137,10 @@ public class AMQConsumer { } SimpleString destinationName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName())); - if (openwireDestination.isTopic()) { SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName); - serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); + serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.getPriority(), info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); //only advisory topic consumers need this. ((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck); @@ -151,7 +150,7 @@ public class AMQConsumer { } catch (ActiveMQQueueExistsException e) { // ignore } - serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.isBrowser(), false, -1); + serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.getPriority(), info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(destinationName.toString()); if (addrSettings != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 3b68d8b837..faefa39221 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -36,13 +36,14 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage; @@ -83,7 +84,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAR import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; @@ -323,7 +323,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_CREATECONSUMER: { SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet; requiresResponse = request.isRequiresResponse(); - session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly()); + session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.getPriority(), request.isBrowseOnly(), true, null); if (requiresResponse) { // We send back queue information on the queue as a response- this allows the queue to // be automatically recreated on failover diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java index 1dfff29e1b..babddc22cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java @@ -18,10 +18,12 @@ package org.apache.activemq.artemis.core.server; import java.util.List; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.PriorityAware; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; -public interface Consumer { +public interface Consumer extends PriorityAware { /** * @@ -85,6 +87,11 @@ public interface Consumer { /** an unique sequential ID for this consumer */ long sequentialID(); + @Override + default int getPriority() { + return ActiveMQDefaultConfiguration.getDefaultConsumerPriority(); + } + default void errorProcessing(Throwable e, MessageReference reference) { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index cfc3e014d2..3ba3384526 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -107,7 +107,15 @@ public interface ServerSession extends SecurityAuth { void addCloseable(Closeable closeable); - /** + ServerConsumer createConsumer(long consumerID, + SimpleString queueName, + SimpleString filterString, + int priority, + boolean browseOnly, + boolean supportLargeMessage, + Integer credits) throws Exception; + + /** * To be used by protocol heads that needs to control the transaction outside the session context. */ void resetTX(Transaction transaction); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java new file mode 100644 index 0000000000..895b1ee06b --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.RepeatableIterator; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Set; + +public interface QueueConsumers extends Iterable, RepeatableIterator, ResettableIterator { + + Set getPriorites(); + + boolean add(T t); + + boolean remove(T t); + + int size(); + + boolean isEmpty(); + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java new file mode 100644 index 0000000000..0495c5172e --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.UpdatableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private UpdatableIterator iterator = new UpdatableIterator<>(consumers.resettableIterator()); + + //-- START :: ResettableIterator Methods + // As any iterator, these are not thread-safe and should ONLY be called by a single thread at a time. + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public void repeat() { + iterator.repeat(); + } + + @Override + public void reset() { + iterator.reset(); + } + + //-- END :: ResettableIterator Methods + + + @Override + public boolean add(T t) { + boolean result = consumers.add(t); + if (result) { + iterator.update(consumers.resettableIterator()); + } + return result; + } + + @Override + public boolean remove(T t) { + boolean result = consumers.remove(t); + if (result) { + iterator.update(consumers.resettableIterator()); + } + return result; + } + + @Override + public int size() { + return consumers.size(); + } + + @Override + public boolean isEmpty() { + return consumers.isEmpty(); + } + + @Override + public Iterator iterator() { + return unmodifiableConsumers.iterator(); + } + + @Override + public void forEach(Consumer action) { + unmodifiableConsumers.forEach(action); + } + + @Override + public Spliterator spliterator() { + return unmodifiableConsumers.spliterator(); + } + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 521ece3896..9f3b53b83e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -21,7 +21,6 @@ import java.io.StringWriter; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -29,9 +28,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -42,6 +41,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNullRefException; @@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.PriorityAware; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -98,6 +99,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; +import org.apache.activemq.artemis.utils.collections.SingletonIterator; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; @@ -184,11 +186,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this); - // used to control if we should recalculate certain positions inside deliverAsync - private volatile boolean consumersChanged = true; - - private final List consumerList = new CopyOnWriteArrayList<>(); - private final ScheduledDeliveryHandler scheduledDeliveryHandler; private AtomicLong messagesAdded = new AtomicLong(0); @@ -235,14 +232,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final AtomicInteger consumersCount = new AtomicInteger(); private volatile long consumerRemovedTimestamp = -1; - - private final Set consumerSet = new HashSet<>(); + private final QueueConsumers> consumers = new QueueConsumersImpl<>(); private final Map groups = new HashMap<>(); - private volatile SimpleString expiryAddress; + private volatile Consumer exclusiveConsumer; - private int pos; + private volatile SimpleString expiryAddress; private final ArtemisExecutor executor; @@ -327,7 +323,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { out.println("queueMemorySize=" + queueMemorySize); - for (ConsumerHolder holder : consumerList) { + for (ConsumerHolder holder : consumers) { out.println("consumer: " + holder.consumer.debug()); } @@ -568,6 +564,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public synchronized void setExclusive(boolean exclusive) { this.exclusive = exclusive; + if (!exclusive) { + exclusiveConsumer = null; + } } @Override @@ -1032,22 +1031,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { enterCritical(CRITICAL_CONSUMER); try { synchronized (this) { - if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumersCount.get() >= maxConsumers) { + if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) { throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name); } - consumersChanged = true; - if (!consumer.supportsDirectDelivery()) { this.supportsDirectDeliver = false; } cancelRedistributor(); - - consumerList.add(new ConsumerHolder(consumer)); - - if (consumerSet.add(consumer)) { - int currentConsumerCount = consumersCount.incrementAndGet(); + ConsumerHolder newConsumerHolder = new ConsumerHolder<>(consumer); + if (consumers.add(newConsumerHolder)) { + int currentConsumerCount = consumers.size(); if (delayBeforeDispatch >= 0) { dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis()); } @@ -1075,33 +1070,33 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { enterCritical(CRITICAL_CONSUMER); try { synchronized (this) { - consumersChanged = true; - for (ConsumerHolder holder : consumerList) { + boolean consumerRemoved = false; + for (ConsumerHolder holder : consumers) { if (holder.consumer == consumer) { if (holder.iter != null) { holder.iter.close(); } - consumerList.remove(holder); + consumers.remove(holder); + consumerRemoved = true; break; } } this.supportsDirectDeliver = checkConsumerDirectDeliver(); - if (pos > 0 && pos >= consumerList.size()) { - pos = consumerList.size() - 1; - } - - if (consumerSet.remove(consumer)) { - int currentConsumerCount = consumersCount.decrementAndGet(); + if (consumerRemoved) { consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis()); - boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(currentConsumerCount != 0)); + boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(consumers.size() != 0)); if (stopped) { dispatchStartTimeUpdater.set(this, -1); } } + if (consumer == exclusiveConsumer) { + exclusiveConsumer = null; + } + LinkedList groupsToRemove = null; for (SimpleString groupID : groups.keySet()) { @@ -1134,7 +1129,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private boolean checkConsumerDirectDeliver() { boolean supports = true; - for (ConsumerHolder consumerCheck : consumerList) { + for (ConsumerHolder consumerCheck : consumers) { if (!consumerCheck.consumer.supportsDirectDelivery()) { supports = false; } @@ -1157,7 +1152,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (delay > 0) { - if (consumerSet.isEmpty()) { + if (consumers.isEmpty()) { DelayedAddRedistributor dar = new DelayedAddRedistributor(executor); redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS); @@ -1194,7 +1189,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public int getConsumerCount() { - return consumersCount.get(); + return consumers.size(); } @Override @@ -1203,8 +1198,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public synchronized Set getConsumers() { - return new HashSet<>(consumerSet); + public Set getConsumers() { + Set consumersSet = new HashSet<>(this.consumers.size()); + for (ConsumerHolder consumerHolder : consumers) { + consumersSet.add(consumerHolder.consumer); + } + return consumersSet; } @Override @@ -1229,7 +1228,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public boolean hasMatchingConsumer(final Message message) { - for (ConsumerHolder holder : consumerList) { + for (ConsumerHolder holder : consumers) { Consumer consumer = holder.consumer; if (consumer instanceof Redistributor) { @@ -1376,18 +1375,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public Map> getDeliveringMessages() { - - List consumerListClone = cloneConsumersList(); + final Iterator> consumerHolderIterator; + synchronized (this) { + consumerHolderIterator = redistributor == null ? consumers.iterator() : SingletonIterator.newInstance(redistributor); + } Map> mapReturn = new HashMap<>(); - for (ConsumerHolder holder : consumerListClone) { + while (consumerHolderIterator.hasNext()) { + ConsumerHolder holder = consumerHolderIterator.next(); List msgs = holder.consumer.getDeliveringMessages(); if (msgs != null && msgs.size() > 0) { mapReturn.put(holder.consumer.toManagementString(), msgs); } } - return mapReturn; } @@ -1867,7 +1868,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { postOffice.removeBinding(name, tx, true); if (removeConsumers) { - for (ConsumerHolder consumerHolder : consumerList) { + for (ConsumerHolder consumerHolder : consumers) { consumerHolder.consumer.disconnect(); } } @@ -2239,7 +2240,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public synchronized void resetAllIterators() { - for (ConsumerHolder holder : this.consumerList) { + for (ConsumerHolder holder : this.consumers) { holder.resetIterator(); } if (redistributor != null) { @@ -2420,14 +2421,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // Either the iterator is empty or the consumer is busy int noDelivery = 0; - int size = 0; - - int endPos = -1; - int handled = 0; long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT; - + consumers.reset(); while (true) { if (handled == MAX_DELIVERIES_IN_LOOP) { // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too @@ -2465,21 +2462,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { ConsumerHolder holder; if (redistributor == null) { - - if (endPos < 0 || consumersChanged) { - consumersChanged = false; - - size = consumerList.size(); - - endPos = pos - 1; - - if (endPos < 0) { - endPos = size - 1; - noDelivery = 0; - } + if (consumers.hasNext()) { + holder = consumers.next(); + } else { + break; } - - holder = consumerList.get(pos); } else { holder = redistributor; } @@ -2508,7 +2495,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { handled++; - + consumers.reset(); continue; } @@ -2516,37 +2503,28 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.trace("Queue " + this.getName() + " is delivering reference " + ref); } - // If a group id is set, then this overrides the consumer chosen round-robin + final SimpleString groupID = extractGroupID(ref); + groupConsumer = getGroupConsumer(groupID); - SimpleString groupID = extractGroupID(ref); - - if (groupID != null) { - groupConsumer = groups.get(groupID); - - if (groupConsumer != null) { - consumer = groupConsumer; - } - } - - if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; + if (groupConsumer != null) { + consumer = groupConsumer; } HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - deliveriesInTransit.countUp(); - - handledconsumer = consumer; - - removeMessageReference(holder, ref); - if (redistributor == null) { handleMessageGroup(ref, consumer, groupConsumer, groupID); } + deliveriesInTransit.countUp(); + + + removeMessageReference(holder, ref); + handledconsumer = consumer; handled++; + consumers.reset(); } else if (status == HandleStatus.BUSY) { try { holder.iter.repeat(); @@ -2561,18 +2539,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { noDelivery++; } else if (status == HandleStatus.NO_MATCH) { // nothing to be done on this case, the iterators will just jump next + consumers.reset(); } } - if (redistributor != null || groupConsumer != null || exclusive) { + if (redistributor != null || groupConsumer != null) { if (noDelivery > 0) { break; } noDelivery = 0; - } else if (pos == endPos) { + } else if (!consumers.hasNext()) { // Round robin'd all - if (noDelivery == size) { + if (noDelivery == this.consumers.size()) { if (handledconsumer != null) { // this shouldn't really happen, // however I'm keeping this as an assertion case future developers ever change the logic here on this class @@ -2587,16 +2566,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { noDelivery = 0; } - - // Only move onto the next position if the consumer on the current position was used. - // When using group we don't need to load balance to the next position - if (redistributor == null && !exclusive && groupConsumer == null) { - pos++; - } - - if (pos >= size) { - pos = 0; - } } if (handledconsumer != null) { @@ -2637,7 +2606,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return null; } else { try { - // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever return ref.getMessage().getGroupID(); } catch (Throwable e) { ActiveMQServerLogger.LOGGER.unableToExtractGroupID(e); @@ -2738,15 +2706,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private void internalAddRedistributor(final ArtemisExecutor executor) { // create the redistributor only once if there are no local consumers - if (consumerSet.isEmpty() && redistributor == null) { + if (consumers.isEmpty() && redistributor == null) { if (logger.isTraceEnabled()) { logger.trace("QueueImpl::Adding redistributor on queue " + this.toString()); } redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE))); - consumersChanged = true; - redistributor.consumer.start(); deliverAsync(); @@ -3078,9 +3044,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private boolean deliverDirect(final MessageReference ref) { synchronized (this) { if (!supportsDirectDeliver) { - // this should never happen, but who knows? - // if someone ever change add and removeConsumer, - // this would protect any eventual bug return false; } if (paused || !canDispatch() && redistributor == null) { @@ -3091,45 +3054,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return true; } - int startPos = pos; + consumers.reset(); - int size = consumerList.size(); - - while (true) { - ConsumerHolder holder; - if (redistributor == null) { - holder = consumerList.get(pos); - } else { - holder = redistributor; - } + while (consumers.hasNext() || redistributor != null) { + ConsumerHolder holder = redistributor == null ? consumers.next() : redistributor; Consumer consumer = holder.consumer; - Consumer groupConsumer = null; + final SimpleString groupID = extractGroupID(ref); + Consumer groupConsumer = getGroupConsumer(groupID); - // If a group id is set, then this overrides the consumer chosen round-robin - - SimpleString groupID = extractGroupID(ref); - - if (groupID != null) { - groupConsumer = groups.get(groupID); - - if (groupConsumer != null) { - consumer = groupConsumer; - } - } - - if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; - } - - // Only move onto the next position if the consumer on the current position was used. - if (redistributor == null && !exclusive && groupConsumer == null) { - pos++; - } - - if (pos == size) { - pos = 0; + if (groupConsumer != null) { + consumer = groupConsumer; } HandleStatus status = handle(ref, consumer); @@ -3144,11 +3080,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { deliveriesInTransit.countUp(); proceedDeliver(consumer, ref); + consumers.reset(); return true; } - if (pos == startPos || redistributor != null || groupConsumer != null || exclusive) { - // Tried them all + if (redistributor != null || groupConsumer != null) { break; } } @@ -3160,13 +3096,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + private Consumer getGroupConsumer(SimpleString groupID) { + Consumer groupConsumer = null; + if (exclusive) { + // If exclusive is set, then this overrides the consumer chosen round-robin + groupConsumer = exclusiveConsumer; + } else { + // If a group id is set, then this overrides the consumer chosen round-robin + if (groupID != null) { + groupConsumer = groups.get(groupID); + } + } + return groupConsumer; + } + private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) { - if (groupID != null) { + if (exclusive) { + if (groupConsumer == null) { + exclusiveConsumer = consumer; + } + consumers.repeat(); + } else if (groupID != null) { if (extractGroupSequence(ref) == -1) { groups.remove(groupID); - } - if (groupConsumer == null) { + consumers.repeat(); + } else if (groupConsumer == null) { groups.put(groupID, consumer); + } else { + consumers.repeat(); } } } @@ -3245,19 +3202,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return status; } - private List cloneConsumersList() { - List consumerListClone; - - synchronized (this) { - if (redistributor == null) { - consumerListClone = new ArrayList<>(consumerList); - } else { - consumerListClone = Collections.singletonList(redistributor); - } - } - return consumerListClone; - } - @Override public void postAcknowledge(final MessageReference ref) { QueueImpl queue = (QueueImpl) ref.getQueue(); @@ -3407,7 +3351,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // Inner classes // -------------------------------------------------------------------------- - protected static class ConsumerHolder { + protected static class ConsumerHolder implements PriorityAware { ConsumerHolder(final T consumer) { this.consumer = consumer; @@ -3424,6 +3368,27 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { iter = null; } + private Consumer consumer() { + return consumer; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConsumerHolder that = (ConsumerHolder) o; + return Objects.equals(consumer, that.consumer); + } + + @Override + public int hashCode() { + return Objects.hash(consumer); + } + + @Override + public int getPriority() { + return consumer.getPriority(); + } } private class DelayedAddRedistributor implements Runnable { @@ -3745,19 +3710,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); } - Set consumersSet = getConsumers(); - if (consumersSet.size() == 0) { + if (consumers.size() == 0) { logger.debug("There are no consumers, no need to check slow consumer's rate"); return; - } else if (queueRate < (threshold * consumersSet.size())) { + } else if (queueRate < (threshold * consumers.size())) { if (logger.isDebugEnabled()) { logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); } return; } - for (Consumer consumer : consumersSet) { + for (ConsumerHolder consumerHolder : consumers) { + Consumer consumer = consumerHolder.consumer(); if (consumer instanceof ServerConsumerImpl) { ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; float consumerRate = serverConsumer.getRate(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 477fae6639..48e2f06bf3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Message; @@ -88,6 +89,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private final Filter filter; + private final int priority; + private final int minLargeMessageSize; private final ServerSession session; @@ -194,12 +197,32 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { final boolean supportLargeMessage, final Integer credits, final ActiveMQServer server) throws Exception { + this(id, session, binding, filter, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); + } + + public ServerConsumerImpl(final long id, + final ServerSession session, + final QueueBinding binding, + final Filter filter, + final int priority, + final boolean started, + final boolean browseOnly, + final StorageManager storageManager, + final SessionCallback callback, + final boolean preAcknowledge, + final boolean strictUpdateDeliveryCount, + final ManagementService managementService, + final boolean supportLargeMessage, + final Integer credits, + final ActiveMQServer server) throws Exception { this.id = id; this.sequentialID = server.getStorageManager().generateID(); this.filter = filter; + this.priority = priority; + this.session = session; this.binding = binding; @@ -501,6 +524,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return filter; } + @Override + public int getPriority() { + return priority; + } + @Override public SimpleString getFilterString() { return filter == null ? null : filter.getFilterString(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 53f41dd39c..56c60f53da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.Closeable; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; @@ -455,6 +456,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final boolean browseOnly, final boolean supportLargeMessage, final Integer credits) throws Exception { + return this.createConsumer(consumerID, queueName, filterString, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), browseOnly, supportLargeMessage, credits); + } + + @Override + public ServerConsumer createConsumer(final long consumerID, + final SimpleString queueName, + final SimpleString filterString, + final int priority, + final boolean browseOnly, + final boolean supportLargeMessage, + final Integer credits) throws Exception { final SimpleString unPrefixedQueueName = removePrefix(queueName); Binding binding = postOffice.getBinding(unPrefixedQueueName); @@ -485,7 +497,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { filterString, browseOnly, supportLargeMessage)); } - ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); + ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); consumers.put(consumer.getID(), consumer); if (server.hasBrokerConsumerPlugins()) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java new file mode 100644 index 0000000000..1055af74eb --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class QueueConsumersImplTest { + + private QueueConsumers queueConsumers; + + @Before + public void setUp() { + queueConsumers = new QueueConsumersImpl<>(); + } + + @Test + public void addTest() { + TestPriority testPriority = new TestPriority("hello", 0); + assertFalse(queueConsumers.hasNext()); + + queueConsumers.add(testPriority); + queueConsumers.reset(); + assertTrue(queueConsumers.hasNext()); + + assertEquals(testPriority, queueConsumers.next()); + } + + @Test + public void removeTest() { + TestPriority testPriority = new TestPriority("hello", 0); + assertFalse(queueConsumers.hasNext()); + + queueConsumers.add(testPriority); + queueConsumers.reset(); + assertTrue(queueConsumers.hasNext()); + + queueConsumers.remove(testPriority); + queueConsumers.reset(); + assertFalse(queueConsumers.hasNext()); + + assertEquals(0, queueConsumers.getPriorites().size()); + queueConsumers.remove(testPriority); + queueConsumers.remove(testPriority); + + } + + + + @Test + public void roundRobinTest() { + queueConsumers.add(new TestPriority("A", 127)); + queueConsumers.add(new TestPriority("B", 127)); + queueConsumers.add(new TestPriority("E", 0)); + queueConsumers.add(new TestPriority("D", 20)); + queueConsumers.add(new TestPriority("C", 127)); + queueConsumers.reset(); + assertTrue(queueConsumers.hasNext()); + + assertEquals("A", queueConsumers.next().getName()); + + //Reset iterator should mark start as current position + queueConsumers.reset(); + assertTrue(queueConsumers.hasNext()); + assertEquals("B", queueConsumers.next().getName()); + + assertTrue(queueConsumers.hasNext()); + assertEquals("C", queueConsumers.next().getName()); + + //Expect another A as after reset, we started at B so after A we then expect the next level + assertTrue(queueConsumers.hasNext()); + assertEquals("A", queueConsumers.next().getName()); + + assertTrue(queueConsumers.hasNext()); + assertEquals("D", queueConsumers.next().getName()); + + assertTrue(queueConsumers.hasNext()); + assertEquals("E", queueConsumers.next().getName()); + + //We have iterated all. + assertFalse(queueConsumers.hasNext()); + + //Reset to iterate again. + queueConsumers.reset(); + + //We expect the iteration to round robin from last returned at the level. + assertTrue(queueConsumers.hasNext()); + assertEquals("B", queueConsumers.next().getName()); + + + } + + + + + + private class TestPriority implements PriorityAware { + + private final int priority; + private final String name; + + private TestPriority(String name, int priority) { + this.priority = priority; + this.name = name; + } + + @Override + public int getPriority() { + return priority; + } + + public String getName() { + return name; + } + } + +} \ No newline at end of file diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md index d86ed8582c..0aa3085155 100644 --- a/docs/user-manual/en/SUMMARY.md +++ b/docs/user-manual/en/SUMMARY.md @@ -41,6 +41,7 @@ * [Last-Value Queues](last-value-queues.md) * [Exclusive Queues](exclusive-queues.md) * [Message Grouping](message-grouping.md) +* [Consumer Priority](consumer-priority.md) * [Extra Acknowledge Modes](pre-acknowledge.md) * [Management](management.md) * [Management Console](management-console.md) diff --git a/docs/user-manual/en/consumer-priority.md b/docs/user-manual/en/consumer-priority.md new file mode 100644 index 0000000000..a7a5ca7dc1 --- /dev/null +++ b/docs/user-manual/en/consumer-priority.md @@ -0,0 +1,45 @@ +# Consumer Priority + +Consumer priorities allow you to ensure that high priority consumers receive messages while they are active. + +Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority. + +Messages will only going to lower priority consumers when the high priority consumers do not have credit available to consume the message, or those high priority consumers have declined to accept the message (for instance because it does not meet the criteria of any selectors associated with the consumer). + +Where a consumer does not set, the default priority 0 is used. + +## Core + +#### JMS Example + + +When using the JMS Client you can set the priority to be used, by using address parameters when +creating the destination used by the consumer. + +```java +Queue queue = session.createQueue("my.destination.name?consmer-priority=50"); +Topic topic = session.createTopic("my.destination.name?consmer-priority=50"); + +consumer = session.createConsumer(queue); +``` + +The range of priority values is -231 to 231-1. + +## OpenWire + +####JMS Example + +The priority for a consumer is set using Destination Options as follows: + +```java +queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10"); +consumer = session.createConsumer(queue); +``` + +Because of the limitation of OpenWire, the range of priority values is: 0 to 127. The highest priority is 127. + +## AMQP + +In AMQP 1.0 the priority of the consumer is set in the properties map of the attach frame where the broker side of the link represents the sending side of the link. + +The key for the entry must be the literal string priority, and the value of the entry must be an integral number in the range -231 to 231-1. \ No newline at end of file diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index fb4e4daa04..dc83ef751e 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -79,6 +79,7 @@ public class AmqpReceiver extends AmqpAbstractResource { private String selector; private boolean presettle; private boolean noLocal; + private Map properties; private AsyncResult pullRequest; private AsyncResult stopRequest; @@ -173,6 +174,14 @@ public class AmqpReceiver extends AmqpAbstractResource { } } + public void setProperties(Map properties) { + if (getEndpoint() != null) { + throw new IllegalStateException("Endpoint already established"); + } + + this.properties = properties; + } + /** * Detach the receiver, a closed receiver will throw exceptions if any further send calls are * made. @@ -782,7 +791,9 @@ public class AmqpReceiver extends AmqpAbstractResource { } else { receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); } - + if (properties != null) { + receiver.setProperties(properties); + } setEndpoint(receiver); super.doOpen(); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 8c331cae17..53d45e33b2 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -306,6 +306,12 @@ public class AmqpSession extends AmqpAbstractResource { return createReceiver(address, selector, noLocal, false); } + public AmqpReceiver createReceiver(String address, + String selector, + boolean noLocal, + boolean presettle) throws Exception { + return createReceiver(address, selector, noLocal, presettle, null); + } /** * Create a receiver instance using the given address * @@ -313,13 +319,15 @@ public class AmqpSession extends AmqpAbstractResource { * @param selector the JMS selector to use for the subscription * @param noLocal should the subscription have messages from its connection filtered. * @param presettle should the receiver be created with a settled sender mode. + * @param properties to set on the receiver * @return a newly created receiver that is ready for use. * @throws Exception if an error occurs while creating the receiver. */ public AmqpReceiver createReceiver(String address, String selector, boolean noLocal, - boolean presettle) throws Exception { + boolean presettle, + Map properties) throws Exception { checkClosed(); final ClientFuture request = new ClientFuture(); @@ -330,6 +338,9 @@ public class AmqpSession extends AmqpAbstractResource { if (selector != null && !selector.isEmpty()) { receiver.setSelector(selector); } + if (properties != null && !properties.isEmpty()) { + receiver.setProperties(properties); + } connection.getScheduler().execute(new Runnable() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java new file mode 100644 index 0000000000..95011d3a5f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport { + + @Test(timeout = 30000) + public void testPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), 5); + AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1); + receiver1.flow(100); + + Map properties2 = new HashMap<>(); + properties2.put(Symbol.getSymbol("priority"), 50); + AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2); + receiver2.flow(100); + + Map properties3 = new HashMap<>(); + properties3.put(Symbol.getSymbol("priority"), 10); + AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3); + receiver3.flow(100); + + sendMessages(getQueueName(), 5); + + + for (int i = 0; i < 5; i++) { + AmqpMessage message1 = receiver1.receiveNoWait(); + AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS); + AmqpMessage message3 = receiver3.receiveNoWait(); + assertNotNull("did not receive message first time", message2); + assertEquals("MessageID:" + i, message2.getMessageId()); + message2.accept(); + assertNull("message is not meant to goto lower priority receiver", message1); + assertNull("message is not meant to goto lower priority receiver", message3); + } + assertNoMessage(receiver1); + assertNoMessage(receiver3); + + //Close the high priority receiver + receiver2.close(); + + sendMessages(getQueueName(), 5); + + //Check messages now goto next priority receiver + for (int i = 0; i < 5; i++) { + AmqpMessage message1 = receiver1.receiveNoWait(); + AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS); + assertNotNull("did not receive message first time", message3); + assertEquals("MessageID:" + i, message3.getMessageId()); + message3.accept(); + assertNull("message is not meant to goto lower priority receiver", message1); + } + assertNoMessage(receiver1); + + + connection.close(); + } + + public void assertNoMessage(AmqpReceiver receiver) throws Exception { + //A check to make sure no messages + AmqpMessage message = receiver.receive(250, TimeUnit.MILLISECONDS); + assertNull("message is not meant to goto lower priority receiver", message); + } + + @Test(timeout = 30000) + public void testPriorityProvidedAsByte() throws Exception { + testPriorityNumber((byte) 5); + } + + @Test(timeout = 30000) + public void testPriorityProvidedAsUnsignedInteger() throws Exception { + testPriorityNumber(UnsignedInteger.valueOf(5)); + } + + + private void testPriorityNumber(Number number) throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), number); + AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1); + receiver1.flow(100); + + sendMessages(getQueueName(), 2); + + + for (int i = 0; i < 2; i++) { + AmqpMessage message1 = receiver1.receiveNoWait(); + assertNotNull("did not receive message first time", message1); + assertEquals("MessageID:" + i, message1.getMessageId()); + message1.accept(); + } + connection.close(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java new file mode 100644 index 0000000000..47b9f5615c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.client; + +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +/** + * Consumer Priority Test + */ +public class ConsumerPriorityTest extends JMSTestBase { + + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + + protected ConnectionFactory getCF() throws Exception { + return cf; + } + + @Test + public void testConsumerPriorityQueueConsumerSettingUsingAddressQueueParameters() throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + String queueName = getName(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName); + Queue queue1 = session.createQueue(queueName + "?consumer-priority=3"); + Queue queue2 = session.createQueue(queueName + "?consumer-priority=2"); + Queue queue3 = session.createQueue(queueName + "?consumer-priority=1"); + + assertEquals(queueName, queue.getQueueName()); + + ActiveMQDestination b = (ActiveMQDestination) queue1; + assertEquals(3, b.getQueueAttributes().getConsumerPriority().intValue()); + ActiveMQDestination c = (ActiveMQDestination) queue2; + assertEquals(2, c.getQueueAttributes().getConsumerPriority().intValue()); + ActiveMQDestination d = (ActiveMQDestination) queue3; + assertEquals(1, d.getQueueAttributes().getConsumerPriority().intValue()); + + MessageProducer producer = session.createProducer(queue); + + MessageConsumer consumer1 = session.createConsumer(queue1); + MessageConsumer consumer2 = session.createConsumer(queue2); + MessageConsumer consumer3 = session.createConsumer(queue3); + + connection.start(); + + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + + producer.send(message); + } + + + //All msgs should go to the first consumer + for (int j = 0; j < 100; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer2.receiveNoWait(); + assertNull(tm); + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + } finally { + connection.close(); + } + } + + @Test + public void testConsumerPriorityQueueConsumerRoundRobin() throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + String queueName = getName(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName); + Queue queue1 = session.createQueue(queueName + "?consumer-priority=3"); + Queue queue2 = session.createQueue(queueName + "?consumer-priority=3"); + Queue queue3 = session.createQueue(queueName + "?consumer-priority=1"); + + + MessageProducer producer = session.createProducer(queue); + + MessageConsumer consumer1 = session.createConsumer(queue1); + MessageConsumer consumer2 = session.createConsumer(queue2); + MessageConsumer consumer3 = session.createConsumer(queue3); + + connection.start(); + + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + message.setIntProperty("counter", j); + producer.send(message); + } + + + //All msgs should go to the first two consumers, round robin'd + for (int j = 0; j < 50; j += 2) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + TextMessage tm2 = (TextMessage) consumer2.receive(10000); + assertNotNull(tm2); + + assertEquals("Message" + (j + 1), tm2.getText()); + + + + TextMessage tm3 = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm3); + } + } finally { + connection.close(); + } + } + + @Test + public void testConsumerPriorityQueueConsumerFailover() throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + String queueName = getName(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName); + Queue queue1 = session.createQueue(queueName + "?consumer-priority=3"); + Queue queue2 = session.createQueue(queueName + "?consumer-priority=2"); + Queue queue3 = session.createQueue(queueName + "?consumer-priority=1"); + + + MessageProducer producer = session.createProducer(queue); + + MessageConsumer consumer1 = session.createConsumer(queue1); + MessageConsumer consumer2 = session.createConsumer(queue2); + MessageConsumer consumer3 = session.createConsumer(queue3); + + connection.start(); + + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + + producer.send(message); + } + + + //All msgs should go to the first consumer + for (int j = 0; j < 50; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer2.receiveNoWait(); + assertNull(tm); + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + consumer1.close(); + + //All msgs should now go to the next consumer only, without any errors or exceptions + for (int j = 50; j < 100; j++) { + TextMessage tm = (TextMessage) consumer2.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + } finally { + connection.close(); + } + } + + + @Test + public void testConsumerPriorityTopicSharedConsumerFailover() throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + String topicName = getName(); + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Destination topic = session.createTopic(topicName); + MessageProducer producer = session.createProducer(topic); + + String subscriptionName = "sharedsub"; + Topic topicConsumer1 = session.createTopic(topicName + "?consumer-priority=3"); + Topic topicConsumer2 = session.createTopic(topicName + "?consumer-priority=2"); + Topic topicConsumer3 = session.createTopic(topicName + "?consumer-priority=1"); + + + MessageConsumer consumer1 = session.createSharedDurableConsumer(topicConsumer1, subscriptionName); + MessageConsumer consumer2 = session.createSharedDurableConsumer(topicConsumer2, subscriptionName); + MessageConsumer consumer3 = session.createSharedDurableConsumer(topicConsumer3, subscriptionName); + + connection.start(); + + for (int j = 0; j < 100; j++) { + TextMessage message = session.createTextMessage(); + + message.setText("Message" + j); + + producer.send(message); + } + + + //All msgs should go to the first consumer + for (int j = 0; j < 50; j++) { + TextMessage tm = (TextMessage) consumer1.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer2.receiveNoWait(); + assertNull(tm); + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + consumer1.close(); + + //All msgs should now go to the next consumer only, without any errors or exceptions + for (int j = 50; j < 100; j++) { + TextMessage tm = (TextMessage) consumer2.receive(10000); + + assertNotNull(tm); + + assertEquals("Message" + j, tm.getText()); + + tm = (TextMessage) consumer3.receiveNoWait(); + assertNull(tm); + } + + + } finally { + connection.close(); + } + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java new file mode 100644 index 0000000000..637e9d6d7d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire.amq; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + + +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Before; +import org.junit.Test; + +public class QueueConsumerPriorityTest extends BasicOpenWireTest { + + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + this.makeSureCoreQueueExist("QUEUE.A"); + } + @Test + public void testQueueConsumerPriority() throws JMSException, InterruptedException { + connection.start(); + Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(consumerHighPriority); + Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "QUEUE.A"; + ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1"); + MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low); + + ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2"); + MessageConsumer highConsumer = consumerLowPriority.createConsumer(high); + + ActiveMQQueue senderQueue = new ActiveMQQueue(queueName); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + for (int i = 0; i < 1000; i++) { + producer.send(msg); + assertNotNull("null on iteration: " + i, highConsumer.receive(1000)); + } + assertNull(lowConsumer.receive(250)); + } +} diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java index 7a91c21420..ecd0f78fd2 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java @@ -1174,6 +1174,11 @@ public class MessageHeaderTest extends MessageHeaderTestBase { return null; } + @Override + public ClientConsumer createConsumer(SimpleString queueName, SimpleString filter, int priority, boolean browseOnly) throws ActiveMQException { + return null; + } + @Override public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, @@ -1183,6 +1188,11 @@ public class MessageHeaderTest extends MessageHeaderTestBase { return null; } + @Override + public ClientConsumer createConsumer(SimpleString queueName, SimpleString filter, int priority, int windowSize, int maxRate, boolean browseOnly) throws ActiveMQException { + return null; + } + @Override public ClientConsumer createConsumer(final String queueName) throws ActiveMQException { return null; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java index a68382f79c..a9a39c131b 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java @@ -746,6 +746,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase { return null; } + @Override + public int getPriority() { + return 0; + } + public long getID() { return 0;