ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs
This commit is contained in:
Michael André Pearce 2019-01-10 20:09:02 +00:00 committed by Clebert Suconic
parent 71d5494345
commit 7dfa0fe7f4
49 changed files with 2681 additions and 187 deletions

View File

@ -32,6 +32,7 @@ public class QueueAttributes implements Serializable {
public static final String PURGE_ON_NO_CONSUMERS = "purge-on-no-consumers";
public static final String CONSUMERS_BEFORE_DISPATCH = "consumers-before-dispatch";
public static final String DELAY_BEFORE_DISPATCH = "delay-before-dispatch";
public static final String CONSUMER_PRIORITY = "consumer-priority";
private RoutingType routingType;
private SimpleString filterString;
@ -44,6 +45,7 @@ public class QueueAttributes implements Serializable {
private Boolean purgeOnNoConsumers;
private Integer consumersBeforeDispatch;
private Long delayBeforeDispatch;
private Integer consumerPriority;
public void set(String key, String value) {
if (key != null && value != null) {
@ -69,6 +71,8 @@ public class QueueAttributes implements Serializable {
setConsumersBeforeDispatch(Integer.valueOf(value));
} else if (key.equals(DELAY_BEFORE_DISPATCH)) {
setDelayBeforeDispatch(Long.valueOf(value));
} else if (key.equals(CONSUMER_PRIORITY)) {
setConsumerPriority(Integer.valueOf(value));
}
}
}
@ -172,4 +176,13 @@ public class QueueAttributes implements Serializable {
return this;
}
public Integer getConsumerPriority() {
return consumerPriority;
}
public QueueAttributes setConsumerPriority(Integer consumerPriority) {
this.consumerPriority = consumerPriority;
return this;
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core;
public interface PriorityAware {
int getPriority();
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.Collection;
/**
* Provides an Array Iterator that is able to reset, allowing you to iterate over the full array.
* It achieves this though by moving end position mark to the the current cursors position,
* so it round robins, even with reset.
* @param <T>
*/
public class ArrayResettableIterator<T> implements ResettableIterator<T> {
private final Object[] array;
private int cursor = 0;
private int endPos = -1;
private boolean hasNext;
public ArrayResettableIterator(Object[] array) {
this.array = array;
reset();
}
public static <T> ResettableIterator<T> iterator(Collection<T> collection) {
return new ArrayResettableIterator<>(collection.toArray());
}
@Override
public void reset() {
endPos = cursor;
hasNext = array.length > 0;
}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public T next() {
if (!hasNext) {
throw new IllegalStateException();
}
@SuppressWarnings("unchecked") T result = (T) array[cursor];
cursor++;
if (cursor == array.length) {
cursor = 0;
}
if (cursor == endPos) {
hasNext = false;
}
return result;
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.Iterator;
/**
* Provides an Iterator that works over multiple underlying iterators.
*
* @param <T> type of the class of the iterator.
*/
public class MultiIterator<T> extends MultiIteratorBase<T, Iterator<T>> {
public MultiIterator(Iterator<T>[] iterators) {
super(iterators);
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.Iterator;
/**
* Provides an Abstract Iterator that works over multiple underlying iterators.
*
* @param <T> type of the class of the iterator.
* @param <I> type of the iterator
*/
abstract class MultiIteratorBase<T, I extends Iterator<T>> implements Iterator<T> {
private final I[] iterators;
private int index = -1;
MultiIteratorBase(I[] iterators) {
this.iterators = iterators;
}
@Override
public boolean hasNext() {
while (true) {
if (index != -1) {
I currentIterator = get(index);
if (currentIterator.hasNext()) {
return true;
}
}
int next = index + 1;
if (next < iterators.length) {
moveTo(next);
} else {
return false;
}
}
}
@Override
public T next() {
while (true) {
if (index != -1) {
I currentIterator = get(index);
if (currentIterator.hasNext()) {
return currentIterator.next();
}
}
int next = index + 1;
if (next < iterators.length) {
moveTo(next);
} else {
return null;
}
}
}
protected void moveTo(int index) {
this.index = index;
}
protected I get(int index) {
return iterators[index];
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
/**
* Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset.
* It achieves this by going back to the first iterator, and as moves to another iterator it resets it.
*
* @param <T> type of the class of the iterator.
*/
public class MultiResettableIterator<T> extends MultiIteratorBase<T, ResettableIterator<T>> implements ResettableIterator<T> {
public MultiResettableIterator(ResettableIterator<T>[] iterators) {
super(iterators);
}
@Override
protected void moveTo(int index) {
super.moveTo(index);
if (index > -1) {
get(index).reset();
}
}
@Override
public void reset() {
moveTo(-1);
}
}

View File

@ -0,0 +1,324 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import org.apache.activemq.artemis.core.PriorityAware;
import java.lang.reflect.Array;
import java.util.AbstractCollection;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* This class's purpose is to hold the the different collections used for each priority level.
*
* A supplier is required to provide the underlying collection needed when a new priority level is seen,
* and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
* if list, then list semantics.
*
* Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
*
* @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
*/
public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
private final Supplier<Collection<T>> supplier;
private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
private volatile int size;
private void setArray(PriorityHolder<T>[] priorityHolders) {
this.priorityHolders = priorityHolders;
}
private PriorityHolder<T>[] getArray() {
return priorityHolders;
}
public PriorityCollection(Supplier<Collection<T>> supplier) {
this.supplier = supplier;
}
@SuppressWarnings("unchecked")
private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
}
@Override
public int size() {
return size;
}
@Override
public boolean isEmpty() {
return size() == 0;
}
public Set<Integer> getPriorites() {
PriorityHolder<T>[] snapshot = getArray();
return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
}
@Override
public Iterator<T> iterator() {
Iterator<T>[] iterators = getIterators();
return new MultiIterator<>(iterators);
}
private Iterator<T>[] getIterators() {
PriorityHolder<T>[] snapshot = this.getArray();
int size = snapshot.length;
Iterator<T>[] iterators = newIteratorArrayInstance(size);
for (int i = 0; i < size; i++) {
iterators[i] = snapshot[i].getValues().iterator();
}
return iterators;
}
@SuppressWarnings("unchecked")
private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
}
public ResettableIterator<T> resettableIterator() {
return new MultiResettableIterator<T>(getResettableIterators());
}
private ResettableIterator<T>[] getResettableIterators() {
PriorityHolder<T>[] snapshot = this.getArray();
int size = snapshot.length;
ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
for (int i = 0; i < size; i++) {
iterators[i] = ArrayResettableIterator.iterator(snapshot[i].getValues());
}
return iterators;
}
@SuppressWarnings("unchecked")
private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
}
@Override
public void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
PriorityHolder<T>[] current = getArray();
int len = current.length;
for (int i = 0; i < len; ++i) {
current[i].getValues().forEach(action);
}
}
private Collection<T> getCollection(int priority, boolean createIfMissing) {
PriorityHolder<T>[] current = getArray();
int low = 0;
int high = current.length - 1;
while (low <= high) {
int mid = (low + high) / 2;
PriorityHolder<T> midVal = current[mid];
if (midVal.getPriority() > priority)
low = mid + 1;
else if (midVal.getPriority() < priority)
high = mid - 1;
else
return midVal.getValues(); //key found
}
if (createIfMissing) {
PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
if (low > 0) {
System.arraycopy(current, 0, newArray, 0, low);
}
if (current.length - low > 0) {
System.arraycopy(current, low, newArray, low + 1, current.length - low);
}
newArray[low] = new PriorityHolder<T>(priority, supplier);
setArray(newArray);
return newArray[low].getValues();
}
return null;
}
@Override
public synchronized boolean add(T t) {
if (size() == Integer.MAX_VALUE) return false;
boolean result = addInternal(t);
calcSize();
return result;
}
private boolean addInternal(T t) {
if (t == null) return false;
Collection<T> priority = getCollection(t.getPriority(), true);
return priority.add(t);
}
@Override
public synchronized boolean remove(Object o) {
boolean result = removeInternal(o);
calcSize();
return result;
}
private boolean removeInternal(Object o) {
if (o instanceof PriorityAware) {
PriorityAware priorityAware = (PriorityAware) o;
Collection<T> priority = getCollection(priorityAware.getPriority(), false);
boolean result = priority != null && priority.remove(priorityAware);
if (priority != null && priority.isEmpty()) {
removeCollection(priorityAware.getPriority());
}
return result;
} else {
return false;
}
}
private Collection<T> removeCollection(int priority) {
PriorityHolder<T>[] current = getArray();
int len = current.length;
int low = 0;
int high = len - 1;
while (low <= high) {
int mid = (low + high) / 2;
PriorityHolder<T> midVal = current[mid];
if (midVal.getPriority() > priority)
low = mid + 1;
else if (midVal.getPriority() < priority)
high = mid - 1;
else {
PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(len - 1);
System.arraycopy(current, 0, newArray, 0, mid);
System.arraycopy(current, mid + 1, newArray, mid, len - mid - 1);
setArray(newArray);
return midVal.getValues(); //key found
}
}
return null;
}
@Override
public boolean containsAll(Collection<?> c) {
Objects.requireNonNull(c);
for (Object e : c)
if (!contains(e))
return false;
return true;
}
@Override
public synchronized boolean addAll(Collection<? extends T> c) {
Objects.requireNonNull(c);
if (size() >= Integer.MAX_VALUE - c.size()) return false;
boolean modified = false;
for (T e : c)
if (addInternal(e))
modified = true;
calcSize();
return modified;
}
@Override
public synchronized boolean removeAll(Collection<?> c) {
Objects.requireNonNull(c);
boolean modified = false;
for (Object o : c) {
if (removeInternal(o)) {
modified = true;
}
}
calcSize();
return modified;
}
@Override
public synchronized boolean retainAll(Collection<?> c) {
Objects.requireNonNull(c);
boolean modified = false;
PriorityHolder<T>[] snapshot = getArray();
for (PriorityHolder<T> priorityHolder : snapshot) {
if (priorityHolder.getValues().retainAll(c)) {
modified = true;
if (priorityHolder.getValues().isEmpty()) {
removeCollection(priorityHolder.getPriority());
}
}
}
calcSize();
return modified;
}
@Override
public synchronized void clear() {
PriorityHolder<T>[] snapshot = getArray();
for (PriorityHolder<T> priorityHolder : snapshot) {
priorityHolder.getValues().clear();
}
calcSize();
}
@Override
public boolean contains(Object o) {
return o instanceof PriorityAware && contains((PriorityAware) o);
}
public boolean contains(PriorityAware priorityAware) {
if (priorityAware == null) return false;
Collection<T> prioritySet = getCollection(priorityAware.getPriority(), false);
return prioritySet != null && prioritySet.contains(priorityAware);
}
private void calcSize() {
PriorityHolder<T>[] current = getArray();
int size = 0;
for (PriorityHolder<T> priorityHolder : current) {
size += priorityHolder.getValues().size();
}
this.size = size;
}
public static class PriorityHolder<E> implements PriorityAware {
private final int priority;
private final Collection<E> values;
public PriorityHolder(int priority, Supplier<Collection<E>> supplier) {
this.priority = priority;
this.values = supplier.get();
}
@Override
public int getPriority() {
return priority;
}
public Collection<E> getValues() {
return values;
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.Iterator;
public interface RepeatableIterator<E> extends Iterator<E> {
/**
* If the current value should repeat.
*/
void repeat();
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.function.Consumer;
public class RepeatableIteratorWrapper<E> implements RepeatableIterator<E>, ResettableIterator<E> {
private ResettableIterator<E> iterator;
private E last;
private boolean repeat;
public RepeatableIteratorWrapper(ResettableIterator<E> iterator) {
this.iterator = iterator;
}
@Override
public void repeat() {
if (last != null) {
repeat = true;
}
}
@Override
public boolean hasNext() {
return repeat || iterator.hasNext();
}
@Override
public E next() {
if (repeat) {
repeat = false;
return last;
}
return last = iterator.next();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void forEachRemaining(Consumer<? super E> action) {
iterator.forEachRemaining(action);
}
@Override
public void reset() {
iterator.reset();
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.Iterator;
public interface ResettableIterator<E> extends Iterator<E> {
/**
* Resets the iterator so you can re-iterate over all elements.
*/
void reset();
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
public class SingletonIterator<E> implements Iterator<E> {
private E value;
public static <E> Iterator<E> newInstance(E e) {
return new SingletonIterator<>(e);
}
private SingletonIterator(E value) {
this.value = value;
}
@Override
public boolean hasNext() {
return value != null;
}
@Override
public E next() {
if (value != null) {
E result = value;
value = null;
return result;
} else {
throw new NoSuchElementException();
}
}
@Override
public void remove() {
value = null;
}
@Override
public void forEachRemaining(Consumer<? super E> action) {
if (value != null)
action.accept(value);
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
public class UpdatableIterator<E> implements ResettableIterator<E>, RepeatableIterator<E> {
private final AtomicReferenceFieldUpdater<UpdatableIterator, RepeatableIteratorWrapper> changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(UpdatableIterator.class, RepeatableIteratorWrapper.class, "changedIterator");
private volatile RepeatableIteratorWrapper<E> changedIterator;
private RepeatableIteratorWrapper<E> currentIterator;
public UpdatableIterator(ResettableIterator<E> iterator) {
this.currentIterator = new RepeatableIteratorWrapper<>(iterator);
}
/**
* This can be called by another thread.
* It sets a new iterator, that will be picked up on the next reset.
*
* @param iterator the new iterator to update to.
*/
public void update(ResettableIterator<E> iterator) {
changedIteratorFieldUpdater.set(this, new RepeatableIteratorWrapper<>(iterator));
}
/*
* ---- ResettableIterator Methods -----
* All the below ResettableIterator (including reset) methods MUST be called by the same thread,
* this is as any other use of Iterator.
*/
/**
* When reset is called, then if a new iterator has been provided by another thread via update method,
* then we switch over to using the new iterator.
*
* It is important that on nulling off the changedIterator, we atomically compare and set as the
* changedIterator could be further updated by another thread whilst we are resetting,
* the subsequent update simply would be picked up on the next reset.
*
* @return this (itself).
*/
@Override
public void reset() {
RepeatableIteratorWrapper<E> changedIterator = this.changedIterator;
if (changedIterator != null) {
currentIterator = changedIterator;
changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
}
currentIterator.reset();
}
@Override
public boolean hasNext() {
return currentIterator.hasNext();
}
@Override
public E next() {
return currentIterator.next();
}
@Override
public void remove() {
currentIterator.remove();
}
@Override
public void forEachRemaining(Consumer<? super E> action) {
currentIterator.forEachRemaining(action);
}
@Override
public void repeat() {
currentIterator.repeat();
}
}

View File

@ -0,0 +1,75 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Iterator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class MultiIteratorTest {
@Test
public void testSingleIterator() {
ArrayList<Integer> arrayList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
arrayList.add(i);
}
MultiIterator<Integer> iterator = new MultiIterator<>(new Iterator[]{arrayList.iterator()});
for (int i = 0; i < 1000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
}
@Test
public void testMutlipleIterators() {
ArrayList<Integer> arrayList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
arrayList.add(i);
}
ArrayList<Integer> arrayList2 = new ArrayList<>();
for (int i = 1000; i < 2000; i++) {
arrayList2.add(i);
}
ArrayList<Integer> arrayList3 = new ArrayList<>();
for (int i = 2000; i < 3000; i++) {
arrayList3.add(i);
}
MultiIterator<Integer> iterator = new MultiIterator<>(new Iterator[]{arrayList.iterator(), arrayList2.iterator(), arrayList3.iterator()});
for (int i = 0; i < 3000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
}
}

View File

@ -0,0 +1,127 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import org.junit.Test;
import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class MultiResettableIteratorTest {
@Test
public void testSingleIterator() {
ArrayList<Integer> arrayList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
arrayList.add(i);
}
MultiResettableIterator<Integer> iterator = new MultiResettableIterator<>(new ResettableIterator[]{ArrayResettableIterator.iterator(arrayList)});
for (int i = 0; i < 1000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
iterator.reset();
for (int i = 0; i < 1000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
}
@Test
public void testMutlipleIterators() {
ArrayList<Integer> arrayList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
arrayList.add(i);
}
ArrayList<Integer> arrayList2 = new ArrayList<>();
for (int i = 1000; i < 2000; i++) {
arrayList2.add(i);
}
ArrayList<Integer> arrayList3 = new ArrayList<>();
for (int i = 2000; i < 3000; i++) {
arrayList3.add(i);
}
MultiResettableIterator<Integer> iterator = new MultiResettableIterator<>(new ResettableIterator[]{ArrayResettableIterator.iterator(arrayList), ArrayResettableIterator.iterator(arrayList2), ArrayResettableIterator.iterator(arrayList3)});
for (int i = 0; i < 3000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
//Reset and ensure we re-iterate all.
iterator.reset();
for (int i = 0; i < 3000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
}
@Test
public void testMutlipleIteratorsResetMidIteration() {
ArrayList<Integer> arrayList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
arrayList.add(i);
}
ArrayList<Integer> arrayList2 = new ArrayList<>();
for (int i = 1000; i < 2000; i++) {
arrayList2.add(i);
}
ArrayList<Integer> arrayList3 = new ArrayList<>();
for (int i = 2000; i < 3000; i++) {
arrayList3.add(i);
}
MultiResettableIterator<Integer> iterator = new MultiResettableIterator<>(new ResettableIterator[]{ArrayResettableIterator.iterator(arrayList), ArrayResettableIterator.iterator(arrayList2), ArrayResettableIterator.iterator(arrayList3)});
for (int i = 0; i < 100; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
//Reset and ensure we re-iterate all.
iterator.reset();
for (int i = 0; i < 3000; i++) {
assertTrue(iterator.hasNext());
assertNotNull(iterator.next());
}
assertFalse(iterator.hasNext());
}
}

View File

@ -0,0 +1,296 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import org.apache.activemq.artemis.core.PriorityAware;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class PriorityCollectionTest {
@Test
public void simpleInsertions() {
PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
assertTrue(set.isEmpty());
assertTrue(set.add(new TestPriorityAware(1)));
assertFalse(set.isEmpty());
assertTrue(set.add(new TestPriorityAware(2)));
assertTrue(set.add(new TestPriorityAware(3)));
assertEquals(set.size(), 3);
assertTrue(set.contains(new TestPriorityAware(1)));
assertEquals(set.size(), 3);
assertTrue(set.remove(new TestPriorityAware(1)));
assertEquals(set.size(), 2);
assertFalse(set.contains(new TestPriorityAware(1)));
assertFalse(set.contains(new TestPriorityAware(5)));
assertEquals(set.size(), 2);
assertTrue(set.add(new TestPriorityAware(1)));
assertEquals(set.size(), 3);
assertFalse(set.add(new TestPriorityAware(1)));
assertEquals(set.size(), 3);
}
@Test
public void testRemove() {
PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
assertTrue(set.isEmpty());
assertTrue(set.add(new TestPriorityAware(1)));
assertFalse(set.isEmpty());
assertFalse(set.remove(new TestPriorityAware(0)));
assertFalse(set.isEmpty());
assertTrue(set.remove(new TestPriorityAware(1)));
assertTrue(set.isEmpty());
}
@Test
public void concurrentInsertions() throws Throwable {
PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 1000;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = Math.abs(random.nextLong());
// Ensure keys are unique
key -= key % (threadIdx + 1);
set.add(new TestPriorityAware(key));
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(set.size(), N * nThreads);
executor.shutdown();
}
@Test
public void concurrentInsertionsAndReads() throws Throwable {
PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 1000;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = Math.abs(random.nextLong());
// Ensure keys are unique
key -= key % (threadIdx + 1);
set.add(new TestPriorityAware(key));
}
}));
futures.add(executor.submit(() -> {
Iterator<TestPriorityAware> iterator = set.resettableIterator();
while (iterator.hasNext()) {
iterator.next();
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(set.size(), N * nThreads);
executor.shutdown();
}
@Test
public void testIteration() {
PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
assertFalse(set.iterator().hasNext());
set.add(new TestPriorityAware(0));
assertTrue(set.iterator().hasNext());
set.remove(new TestPriorityAware(0));
assertFalse(set.iterator().hasNext());
set.add(new TestPriorityAware(0));
set.add(new TestPriorityAware(1));
set.add(new TestPriorityAware(2));
List<TestPriorityAware> values = new ArrayList<>(set);
assertTrue(values.contains(new TestPriorityAware(0)));
assertTrue(values.contains(new TestPriorityAware(1)));
assertTrue(values.contains(new TestPriorityAware(2)));
set.clear();
assertTrue(set.isEmpty());
}
@Test
public void priorityTest() {
PriorityCollection<TestPriority> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
set.add(new TestPriority("A", 127));
set.add(new TestPriority("B", 127));
set.add(new TestPriority("E", 0));
set.add(new TestPriority("D", 20));
set.add(new TestPriority("C", 127));
ResettableIterator<TestPriority> iterator = set.resettableIterator();
iterator.reset();
assertTrue(iterator.hasNext());
assertEquals("A", iterator.next().getName());
//Reset iterator should mark start as current position
iterator.reset();
assertTrue(iterator.hasNext());
assertEquals("B", iterator.next().getName());
assertTrue(iterator.hasNext());
assertEquals("C", iterator.next().getName());
//Expect another A as after reset, we started at B so after A we then expect the next level
assertTrue(iterator.hasNext());
assertEquals("A", iterator.next().getName());
assertTrue(iterator.hasNext());
assertEquals("D", iterator.next().getName());
assertTrue(iterator.hasNext());
assertEquals("E", iterator.next().getName());
//We have iterated all.
assertFalse(iterator.hasNext());
//Reset to iterate again.
iterator.reset();
//We expect the iteration to round robin from last returned at the level.
assertTrue(iterator.hasNext());
assertEquals("B", iterator.next().getName());
}
private class TestPriorityAware implements PriorityAware {
private long value;
private TestPriorityAware(long value) {
this.value = value;
}
@Override
public int getPriority() {
//10 priority levels
return (int) value % 10;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestPriorityAware that = (TestPriorityAware) o;
return value == that.value;
}
@Override
public int hashCode() {
return Objects.hash(value);
}
}
private class TestPriority implements PriorityAware {
private String name;
private int priority;
private TestPriority(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public int getPriority() {
return priority;
}
public String getName() {
return name;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestPriority that = (TestPriority) o;
return priority == that.priority &&
Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name, priority);
}
}
}

View File

@ -0,0 +1,95 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import org.junit.Test;
import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class UpdatableIteratorTest {
@Test
public void testUnderlyingIterator() {
ArrayList<Integer> arrayList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
arrayList.add(i);
}
UpdatableIterator<Integer> iterator = new UpdatableIterator(ArrayResettableIterator.iterator(arrayList));
for (int i = 0; i < 1000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
iterator.reset();
for (int i = 0; i < 1000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
}
@Test
public void testUpdateIterator() {
ArrayList<Integer> arrayList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
arrayList.add(i);
}
ArrayList<Integer> arrayList2 = new ArrayList<>();
for (int i = 4000; i < 5000; i++) {
arrayList2.add(i);
}
UpdatableIterator<Integer> iterator = new UpdatableIterator(ArrayResettableIterator.iterator(arrayList));
for (int i = 0; i < 100; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
//Update the iterator
iterator.update(ArrayResettableIterator.iterator(arrayList2));
//Ensure the current iterator in use is not updated until reset, and we iterate remaining.
for (int i = 100; i < 1000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
//Reset the iterator, we now expect to act on the updated iterator.
iterator.reset();
for (int i = 4000; i < 5000; i++) {
assertTrue(iterator.hasNext());
assertEquals(Integer.valueOf(i), iterator.next());
}
assertFalse(iterator.hasNext());
}
}

View File

@ -473,6 +473,8 @@ public final class ActiveMQDefaultConfiguration {
public static final int DEFAULT_MAX_QUEUE_CONSUMERS = -1;
public static final int DEFAULT_CONSUMER_PRIORITY = 0;
public static final boolean DEFAULT_EXCLUSIVE = false;
public static final boolean DEFAULT_LAST_VALUE = false;
@ -1320,6 +1322,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MAX_QUEUE_CONSUMERS;
}
public static int getDefaultConsumerPriority() {
return DEFAULT_CONSUMER_PRIORITY;
}
public static boolean getDefaultExclusive() {
return DEFAULT_EXCLUSIVE;
}

View File

@ -865,6 +865,30 @@ public interface ClientSession extends XAResource, AutoCloseable {
SimpleString filter,
boolean browseOnly) throws ActiveMQException;
/**
* Creates a ClientConsumer to consume or browse messages matching the filter from the queue with
* the given name.
* <p>
* If <code>browseOnly</code> is <code>true</code>, the ClientConsumer will receive the messages
* from the queue but they will not be consumed (the messages will remain in the queue). Note
* that paged messages will not be in the queue, and will therefore not be visible if
* {@code browseOnly} is {@code true}.
* <p>
* If <code>browseOnly</code> is <code>false</code>, the ClientConsumer will behave like consume
* the messages from the queue and the messages will effectively be removed from the queue.
*
* @param queueName name of the queue to consume messages from
* @param filter only messages which match this filter will be consumed
* @param priority the consumer priority
* @param browseOnly whether the ClientConsumer will only browse the queue or consume messages.
* @return a ClientConsumer
* @throws ActiveMQException if an exception occurs while creating the ClientConsumer
*/
ClientConsumer createConsumer(SimpleString queueName,
SimpleString filter,
int priority,
boolean browseOnly) throws ActiveMQException;
/**
* Creates a ClientConsumer to consume or browse messages matching the filter from the queue with
* the given name.
@ -891,6 +915,35 @@ public interface ClientSession extends XAResource, AutoCloseable {
int maxRate,
boolean browseOnly) throws ActiveMQException;
/**
* Creates a ClientConsumer to consume or browse messages matching the filter from the queue with
* the given name.
* <p>
* If <code>browseOnly</code> is <code>true</code>, the ClientConsumer will receive the messages
* from the queue but they will not be consumed (the messages will remain in the queue). Note
* that paged messages will not be in the queue, and will therefore not be visible if
* {@code browseOnly} is {@code true}.
* <p>
* If <code>browseOnly</code> is <code>false</code>, the ClientConsumer will behave like consume
* the messages from the queue and the messages will effectively be removed from the queue.
*
* @param queueName name of the queue to consume messages from
* @param filter only messages which match this filter will be consumed
* @param priority the consumer priority
* @param windowSize the consumer window size
* @param maxRate the maximum rate to consume messages
* @param browseOnly whether the ClientConsumer will only browse the queue or consume messages.
* @return a ClientConsumer
* @throws ActiveMQException if an exception occurs while creating the ClientConsumer
*/
ClientConsumer createConsumer(SimpleString queueName,
SimpleString filter,
int priority,
int windowSize,
int maxRate,
boolean browseOnly) throws ActiveMQException;
/**
* Creates a ClientConsumer to consume or browse messages matching the filter from the queue with
* the given name.

View File

@ -70,6 +70,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private final SimpleString filterString;
private final int priority;
private final SimpleString queueName;
private final boolean browseOnly;
@ -141,6 +143,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
final ConsumerContext consumerContext,
final SimpleString queueName,
final SimpleString filterString,
final int priority,
final boolean browseOnly,
final int initialWindow,
final int clientWindowSize,
@ -157,6 +160,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
this.filterString = filterString;
this.priority = priority;
this.browseOnly = browseOnly;
this.sessionContext = sessionContext;
@ -562,6 +567,11 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
return filterString;
}
@Override
public int getPriority() {
return priority;
}
@Override
public SimpleString getQueueName() {
return queueName;

View File

@ -29,6 +29,8 @@ public interface ClientConsumerInternal extends ClientConsumer {
SimpleString getFilterString();
int getPriority();
boolean isBrowseOnly();
void handleMessage(ClientMessageInternal message) throws Exception;

View File

@ -798,6 +798,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, browseOnly);
}
@Override
public ClientConsumer createConsumer(final SimpleString queueName,
final SimpleString filterString,
final int priority,
final boolean browseOnly) throws ActiveMQException {
return createConsumer(queueName, filterString, priority, consumerWindowSize, consumerMaxRate, browseOnly);
}
@Override
public ClientConsumer createConsumer(final SimpleString queueName,
final boolean browseOnly) throws ActiveMQException {
@ -821,6 +829,15 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return sessionContext.isWritable(callback);
}
@Override
public ClientConsumer createConsumer(final SimpleString queueName,
final SimpleString filterString,
final int windowSize,
final int maxRate,
final boolean browseOnly) throws ActiveMQException {
return createConsumer(queueName, filterString, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), windowSize, maxRate, browseOnly);
}
/**
* Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on
* the remoting thread).
@ -834,10 +851,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
@Override
public ClientConsumer createConsumer(final SimpleString queueName,
final SimpleString filterString,
final int priority,
final int windowSize,
final int maxRate,
final boolean browseOnly) throws ActiveMQException {
return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly);
return internalCreateConsumer(queueName, filterString, priority, windowSize, maxRate, browseOnly);
}
@Override
@ -1931,12 +1949,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
*/
private ClientConsumer internalCreateConsumer(final SimpleString queueName,
final SimpleString filterString,
final int priority,
final int windowSize,
final int maxRate,
final boolean browseOnly) throws ActiveMQException {
checkClosed();
ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
addConsumer(consumer);

View File

@ -41,6 +41,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION);
}
default boolean isVersionSupportConsumerPriority() {
int version = getChannelVersion();
return version >= PacketImpl.CONSUMER_PRIORITY_CHANGE_VERSION;
}
/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types

View File

@ -367,6 +367,7 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString,
int priority,
int windowSize,
int maxRate,
int ackBatchSize,
@ -377,7 +378,7 @@ public class ActiveMQSessionContext extends SessionContext {
ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, priority, browseOnly, true);
SessionQueueQueryResponseMessage queueInfo;
@ -392,7 +393,7 @@ public class ActiveMQSessionContext extends SessionContext {
// The value we send is just a hint
final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
}
@Override
@ -875,7 +876,7 @@ public class ActiveMQSessionContext extends SessionContext {
sendPacketWithoutLock(sessionChannel, createQueueRequest);
}
SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.isBrowseOnly(), false);
SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.getPriority(), consumerInternal.isBrowseOnly(), false);
sendPacketWithoutLock(sessionChannel, createConsumerRequest);

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
@ -39,8 +40,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@ -91,7 +92,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;

View File

@ -33,7 +33,9 @@ public class PacketImpl implements Packet {
public static final int ADDRESSING_CHANGE_VERSION = 129;
// 2.7.0
public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
public static final int ARTEMIS_2_7_0_VERSION = 130;
public static final int ASYNC_RESPONSE_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
public static final int CONSUMER_PRIORITY_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
@ -318,7 +320,7 @@ public class PacketImpl implements Packet {
encodeHeader(buffer);
encodeRest(buffer);
encodeRest(buffer, connection);
encodeSize(buffer);
@ -394,6 +396,10 @@ public class PacketImpl implements Packet {
public void encodeRest(final ActiveMQBuffer buffer) {
}
public void encodeRest(final ActiveMQBuffer buffer, final CoreRemotingConnection coreRemotingConnection) {
encodeRest(buffer);
}
public void decodeRest(final ActiveMQBuffer buffer) {
}

View File

@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
public class SessionCreateConsumerMessage extends QueueAbstractPacket {
@ -25,6 +27,8 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
private SimpleString filterString;
private int priority;
private boolean browseOnly;
private boolean requiresResponse;
@ -32,6 +36,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
public SessionCreateConsumerMessage(final long id,
final SimpleString queueName,
final SimpleString filterString,
final int priority,
final boolean browseOnly,
final boolean requiresResponse) {
super(SESS_CREATECONSUMER);
@ -39,6 +44,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
this.id = id;
this.queueName = queueName;
this.filterString = filterString;
this.priority = priority;
this.browseOnly = browseOnly;
this.requiresResponse = requiresResponse;
}
@ -55,6 +61,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
buff.append(", id=" + id);
buff.append(", browseOnly=" + browseOnly);
buff.append(", requiresResponse=" + requiresResponse);
buff.append(", priority=" + priority);
buff.append("]");
return buff.toString();
}
@ -67,6 +74,10 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
return filterString;
}
public int getPriority() {
return priority;
}
public boolean isBrowseOnly() {
return browseOnly;
}
@ -84,17 +95,25 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
this.filterString = filterString;
}
public void setPriority(byte priority) {
this.priority = priority;
}
public void setBrowseOnly(boolean browseOnly) {
this.browseOnly = browseOnly;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
public void encodeRest(final ActiveMQBuffer buffer, final CoreRemotingConnection coreRemotingConnection) {
buffer.writeLong(id);
buffer.writeSimpleString(queueName);
buffer.writeNullableSimpleString(filterString);
buffer.writeBoolean(browseOnly);
buffer.writeBoolean(requiresResponse);
//Priority Support added in 2.7.0
if (coreRemotingConnection.isVersionSupportConsumerPriority()) {
buffer.writeInt(priority);
}
}
@Override
@ -104,6 +123,12 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
filterString = buffer.readNullableSimpleString();
browseOnly = buffer.readBoolean();
requiresResponse = buffer.readBoolean();
//Priority Support Added in 2.7.0
if (buffer.readableBytes() > 0) {
priority = buffer.readInt();
} else {
priority = ActiveMQDefaultConfiguration.getDefaultConsumerPriority();
}
}
@Override
@ -113,6 +138,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
result = prime * result + (browseOnly ? 1231 : 1237);
result = prime * result + ((filterString == null) ? 0 : filterString.hashCode());
result = prime * result + (int) (id ^ (id >>> 32));
result = prime * result + priority;
result = prime * result + ((queueName == null) ? 0 : queueName.hashCode());
result = prime * result + (requiresResponse ? 1231 : 1237);
return result;
@ -134,6 +160,8 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
return false;
} else if (!filterString.equals(other.filterString))
return false;
if (priority != other.priority)
return false;
if (id != other.id)
return false;
if (queueName == null) {

View File

@ -314,6 +314,7 @@ public abstract class SessionContext {
public abstract ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString,
int priority,
int windowSize,
int maxRate,
int ackBatchSize,

View File

@ -51,6 +51,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.QueueAttributes;
@ -710,7 +711,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
}
consumer = session.createConsumer(queueName, null, false);
consumer = createClientConsumer(dest, queueName, null);
ActiveMQMessageConsumer jbc = new ActiveMQMessageConsumer(options, connection, this, consumer, false, dest, selectorString, autoDeleteQueueName);
@ -779,7 +780,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
connection.addKnownDestination(dest.getSimpleAddress());
consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, false);
consumer = createClientConsumer(dest, null, coreFilterString);
} else {
AddressQuery response = session.addressQuery(dest.getSimpleAddress());
@ -804,8 +805,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
consumer = session.createConsumer(queueName, null, false);
consumer = createClientConsumer(dest, queueName, null);
autoDeleteQueueName = queueName;
} else {
// Durable sub
@ -860,7 +860,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
}
consumer = session.createConsumer(queueName, null, false);
consumer = createClientConsumer(dest, queueName, null);
}
}
@ -874,6 +874,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
}
private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();
return session.createConsumer(queueName == null ? destination.getSimpleAddress() : queueName, coreFilterString, priority, false);
}
public void ackAllConsumers() throws JMSException {
checkClosed();
}

View File

@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@ -80,6 +82,8 @@ public class AMQPSessionCallback implements SessionCallback {
private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class);
private static final Symbol PRIORITY = Symbol.getSymbol("priority");
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
private final AMQPConnectionCallback protonSPI;
@ -199,7 +203,9 @@ public class AMQPSessionCallback implements SessionCallback {
filter = SelectorTranslator.convertToActiveMQFilterString(filter);
ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null);
int priority = getPriority(protonSender.getSender().getRemoteProperties());
ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), priority, browserOnly, false, null);
// AMQP handles its own flow control for when it's started
consumer.setStarted(true);
@ -209,6 +215,11 @@ public class AMQPSessionCallback implements SessionCallback {
return consumer;
}
private int getPriority(Map<Symbol, Object> properties) {
Number value = properties == null ? null : (Number) properties.get(PRIORITY);
return value == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : value.intValue();
}
public void startSender(Object brokerConsumer) throws Exception {
ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer;
// flow control is done at proton

View File

@ -78,6 +78,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
@Override
public ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString,
int priority,
int windowSize,
int maxRate,
int ackBatchSize,
@ -88,7 +89,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, priority, browseOnly, true);
SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
@ -96,7 +97,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
// could be overridden on the queue settings
// The value we send is just a hint
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
}
}

View File

@ -137,11 +137,10 @@ public class AMQConsumer {
}
SimpleString destinationName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
if (openwireDestination.isTopic()) {
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.getPriority(), info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
//only advisory topic consumers need this.
((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
@ -151,7 +150,7 @@ public class AMQConsumer {
} catch (ActiveMQQueueExistsException e) {
// ignore
}
serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.isBrowser(), false, -1);
serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.getPriority(), info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(destinationName.toString());
if (addrSettings != null) {

View File

@ -36,13 +36,14 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
@ -83,7 +84,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAR
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
@ -323,7 +323,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_CREATECONSUMER: {
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
requiresResponse = request.isRequiresResponse();
session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly());
session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.getPriority(), request.isBrowseOnly(), true, null);
if (requiresResponse) {
// We send back queue information on the queue as a response- this allows the queue to
// be automatically recreated on failover

View File

@ -18,10 +18,12 @@ package org.apache.activemq.artemis.core.server;
import java.util.List;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.PriorityAware;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public interface Consumer {
public interface Consumer extends PriorityAware {
/**
*
@ -85,6 +87,11 @@ public interface Consumer {
/** an unique sequential ID for this consumer */
long sequentialID();
@Override
default int getPriority() {
return ActiveMQDefaultConfiguration.getDefaultConsumerPriority();
}
default void errorProcessing(Throwable e, MessageReference reference) {
}

View File

@ -107,7 +107,15 @@ public interface ServerSession extends SecurityAuth {
void addCloseable(Closeable closeable);
/**
ServerConsumer createConsumer(long consumerID,
SimpleString queueName,
SimpleString filterString,
int priority,
boolean browseOnly,
boolean supportLargeMessage,
Integer credits) throws Exception;
/**
* To be used by protocol heads that needs to control the transaction outside the session context.
*/
void resetTX(Transaction transaction);

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.PriorityAware;
import org.apache.activemq.artemis.utils.collections.RepeatableIterator;
import org.apache.activemq.artemis.utils.collections.ResettableIterator;
import java.util.Set;
public interface QueueConsumers<T extends PriorityAware> extends Iterable<T>, RepeatableIterator<T>, ResettableIterator<T> {
Set<Integer> getPriorites();
boolean add(T t);
boolean remove(T t);
int size();
boolean isEmpty();
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.PriorityAware;
import org.apache.activemq.artemis.utils.collections.PriorityCollection;
import org.apache.activemq.artemis.utils.collections.UpdatableIterator;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
/**
* This class's purpose is to hold the consumers.
*
* CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
* but also lock less for a read path, which is our HOT path.
* Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
*
* There can only be one resettable iterable view,
* A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
* the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
*
* All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
*
* @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
* but intent is this is the QueueImpl:ConsumerHolder.
*/
public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
private UpdatableIterator<T> iterator = new UpdatableIterator<>(consumers.resettableIterator());
//-- START :: ResettableIterator Methods
// As any iterator, these are not thread-safe and should ONLY be called by a single thread at a time.
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return iterator.next();
}
@Override
public void repeat() {
iterator.repeat();
}
@Override
public void reset() {
iterator.reset();
}
//-- END :: ResettableIterator Methods
@Override
public boolean add(T t) {
boolean result = consumers.add(t);
if (result) {
iterator.update(consumers.resettableIterator());
}
return result;
}
@Override
public boolean remove(T t) {
boolean result = consumers.remove(t);
if (result) {
iterator.update(consumers.resettableIterator());
}
return result;
}
@Override
public int size() {
return consumers.size();
}
@Override
public boolean isEmpty() {
return consumers.isEmpty();
}
@Override
public Iterator<T> iterator() {
return unmodifiableConsumers.iterator();
}
@Override
public void forEach(Consumer<? super T> action) {
unmodifiableConsumers.forEach(action);
}
@Override
public Spliterator<T> spliterator() {
return unmodifiableConsumers.spliterator();
}
@Override
public Set<Integer> getPriorites() {
return consumers.getPriorites();
}
}

View File

@ -21,7 +21,6 @@ import java.io.StringWriter;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -29,9 +28,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
@ -42,6 +41,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNullRefException;
@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.PriorityAware;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
@ -98,6 +99,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
import org.apache.activemq.artemis.utils.collections.SingletonIterator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
@ -184,11 +186,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this);
// used to control if we should recalculate certain positions inside deliverAsync
private volatile boolean consumersChanged = true;
private final List<ConsumerHolder> consumerList = new CopyOnWriteArrayList<>();
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
private AtomicLong messagesAdded = new AtomicLong(0);
@ -235,14 +232,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final AtomicInteger consumersCount = new AtomicInteger();
private volatile long consumerRemovedTimestamp = -1;
private final Set<Consumer> consumerSet = new HashSet<>();
private final QueueConsumers<ConsumerHolder<? extends Consumer>> consumers = new QueueConsumersImpl<>();
private final Map<SimpleString, Consumer> groups = new HashMap<>();
private volatile SimpleString expiryAddress;
private volatile Consumer exclusiveConsumer;
private int pos;
private volatile SimpleString expiryAddress;
private final ArtemisExecutor executor;
@ -327,7 +323,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
out.println("queueMemorySize=" + queueMemorySize);
for (ConsumerHolder holder : consumerList) {
for (ConsumerHolder holder : consumers) {
out.println("consumer: " + holder.consumer.debug());
}
@ -568,6 +564,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public synchronized void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
if (!exclusive) {
exclusiveConsumer = null;
}
}
@Override
@ -1032,22 +1031,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_CONSUMER);
try {
synchronized (this) {
if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumersCount.get() >= maxConsumers) {
if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) {
throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
}
consumersChanged = true;
if (!consumer.supportsDirectDelivery()) {
this.supportsDirectDeliver = false;
}
cancelRedistributor();
consumerList.add(new ConsumerHolder(consumer));
if (consumerSet.add(consumer)) {
int currentConsumerCount = consumersCount.incrementAndGet();
ConsumerHolder<Consumer> newConsumerHolder = new ConsumerHolder<>(consumer);
if (consumers.add(newConsumerHolder)) {
int currentConsumerCount = consumers.size();
if (delayBeforeDispatch >= 0) {
dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());
}
@ -1075,33 +1070,33 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_CONSUMER);
try {
synchronized (this) {
consumersChanged = true;
for (ConsumerHolder holder : consumerList) {
boolean consumerRemoved = false;
for (ConsumerHolder holder : consumers) {
if (holder.consumer == consumer) {
if (holder.iter != null) {
holder.iter.close();
}
consumerList.remove(holder);
consumers.remove(holder);
consumerRemoved = true;
break;
}
}
this.supportsDirectDeliver = checkConsumerDirectDeliver();
if (pos > 0 && pos >= consumerList.size()) {
pos = consumerList.size() - 1;
}
if (consumerSet.remove(consumer)) {
int currentConsumerCount = consumersCount.decrementAndGet();
if (consumerRemoved) {
consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis());
boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(currentConsumerCount != 0));
boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(consumers.size() != 0));
if (stopped) {
dispatchStartTimeUpdater.set(this, -1);
}
}
if (consumer == exclusiveConsumer) {
exclusiveConsumer = null;
}
LinkedList<SimpleString> groupsToRemove = null;
for (SimpleString groupID : groups.keySet()) {
@ -1134,7 +1129,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private boolean checkConsumerDirectDeliver() {
boolean supports = true;
for (ConsumerHolder consumerCheck : consumerList) {
for (ConsumerHolder consumerCheck : consumers) {
if (!consumerCheck.consumer.supportsDirectDelivery()) {
supports = false;
}
@ -1157,7 +1152,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (delay > 0) {
if (consumerSet.isEmpty()) {
if (consumers.isEmpty()) {
DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
@ -1194,7 +1189,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public int getConsumerCount() {
return consumersCount.get();
return consumers.size();
}
@Override
@ -1203,8 +1198,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public synchronized Set<Consumer> getConsumers() {
return new HashSet<>(consumerSet);
public Set<Consumer> getConsumers() {
Set<Consumer> consumersSet = new HashSet<>(this.consumers.size());
for (ConsumerHolder<? extends Consumer> consumerHolder : consumers) {
consumersSet.add(consumerHolder.consumer);
}
return consumersSet;
}
@Override
@ -1229,7 +1228,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public boolean hasMatchingConsumer(final Message message) {
for (ConsumerHolder holder : consumerList) {
for (ConsumerHolder holder : consumers) {
Consumer consumer = holder.consumer;
if (consumer instanceof Redistributor) {
@ -1376,18 +1375,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public Map<String, List<MessageReference>> getDeliveringMessages() {
List<ConsumerHolder> consumerListClone = cloneConsumersList();
final Iterator<ConsumerHolder<? extends Consumer>> consumerHolderIterator;
synchronized (this) {
consumerHolderIterator = redistributor == null ? consumers.iterator() : SingletonIterator.newInstance(redistributor);
}
Map<String, List<MessageReference>> mapReturn = new HashMap<>();
for (ConsumerHolder holder : consumerListClone) {
while (consumerHolderIterator.hasNext()) {
ConsumerHolder holder = consumerHolderIterator.next();
List<MessageReference> msgs = holder.consumer.getDeliveringMessages();
if (msgs != null && msgs.size() > 0) {
mapReturn.put(holder.consumer.toManagementString(), msgs);
}
}
return mapReturn;
}
@ -1867,7 +1868,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
postOffice.removeBinding(name, tx, true);
if (removeConsumers) {
for (ConsumerHolder consumerHolder : consumerList) {
for (ConsumerHolder consumerHolder : consumers) {
consumerHolder.consumer.disconnect();
}
}
@ -2239,7 +2240,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public synchronized void resetAllIterators() {
for (ConsumerHolder holder : this.consumerList) {
for (ConsumerHolder holder : this.consumers) {
holder.resetIterator();
}
if (redistributor != null) {
@ -2420,14 +2421,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Either the iterator is empty or the consumer is busy
int noDelivery = 0;
int size = 0;
int endPos = -1;
int handled = 0;
long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
consumers.reset();
while (true) {
if (handled == MAX_DELIVERIES_IN_LOOP) {
// Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
@ -2465,21 +2462,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ConsumerHolder<? extends Consumer> holder;
if (redistributor == null) {
if (endPos < 0 || consumersChanged) {
consumersChanged = false;
size = consumerList.size();
endPos = pos - 1;
if (endPos < 0) {
endPos = size - 1;
noDelivery = 0;
}
if (consumers.hasNext()) {
holder = consumers.next();
} else {
break;
}
holder = consumerList.get(pos);
} else {
holder = redistributor;
}
@ -2508,7 +2495,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
handled++;
consumers.reset();
continue;
}
@ -2516,37 +2503,28 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
}
// If a group id is set, then this overrides the consumer chosen round-robin
final SimpleString groupID = extractGroupID(ref);
groupConsumer = getGroupConsumer(groupID);
SimpleString groupID = extractGroupID(ref);
if (groupID != null) {
groupConsumer = groups.get(groupID);
if (groupConsumer != null) {
consumer = groupConsumer;
}
}
if (exclusive && redistributor == null) {
consumer = consumerList.get(0).consumer;
if (groupConsumer != null) {
consumer = groupConsumer;
}
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
deliveriesInTransit.countUp();
handledconsumer = consumer;
removeMessageReference(holder, ref);
if (redistributor == null) {
handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
deliveriesInTransit.countUp();
removeMessageReference(holder, ref);
handledconsumer = consumer;
handled++;
consumers.reset();
} else if (status == HandleStatus.BUSY) {
try {
holder.iter.repeat();
@ -2561,18 +2539,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
noDelivery++;
} else if (status == HandleStatus.NO_MATCH) {
// nothing to be done on this case, the iterators will just jump next
consumers.reset();
}
}
if (redistributor != null || groupConsumer != null || exclusive) {
if (redistributor != null || groupConsumer != null) {
if (noDelivery > 0) {
break;
}
noDelivery = 0;
} else if (pos == endPos) {
} else if (!consumers.hasNext()) {
// Round robin'd all
if (noDelivery == size) {
if (noDelivery == this.consumers.size()) {
if (handledconsumer != null) {
// this shouldn't really happen,
// however I'm keeping this as an assertion case future developers ever change the logic here on this class
@ -2587,16 +2566,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
noDelivery = 0;
}
// Only move onto the next position if the consumer on the current position was used.
// When using group we don't need to load balance to the next position
if (redistributor == null && !exclusive && groupConsumer == null) {
pos++;
}
if (pos >= size) {
pos = 0;
}
}
if (handledconsumer != null) {
@ -2637,7 +2606,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return null;
} else {
try {
// But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
return ref.getMessage().getGroupID();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.unableToExtractGroupID(e);
@ -2738,15 +2706,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private void internalAddRedistributor(final ArtemisExecutor executor) {
// create the redistributor only once if there are no local consumers
if (consumerSet.isEmpty() && redistributor == null) {
if (consumers.isEmpty() && redistributor == null) {
if (logger.isTraceEnabled()) {
logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());
}
redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));
consumersChanged = true;
redistributor.consumer.start();
deliverAsync();
@ -3078,9 +3044,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private boolean deliverDirect(final MessageReference ref) {
synchronized (this) {
if (!supportsDirectDeliver) {
// this should never happen, but who knows?
// if someone ever change add and removeConsumer,
// this would protect any eventual bug
return false;
}
if (paused || !canDispatch() && redistributor == null) {
@ -3091,45 +3054,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
int startPos = pos;
consumers.reset();
int size = consumerList.size();
while (true) {
ConsumerHolder<? extends Consumer> holder;
if (redistributor == null) {
holder = consumerList.get(pos);
} else {
holder = redistributor;
}
while (consumers.hasNext() || redistributor != null) {
ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;
Consumer consumer = holder.consumer;
Consumer groupConsumer = null;
final SimpleString groupID = extractGroupID(ref);
Consumer groupConsumer = getGroupConsumer(groupID);
// If a group id is set, then this overrides the consumer chosen round-robin
SimpleString groupID = extractGroupID(ref);
if (groupID != null) {
groupConsumer = groups.get(groupID);
if (groupConsumer != null) {
consumer = groupConsumer;
}
}
if (exclusive && redistributor == null) {
consumer = consumerList.get(0).consumer;
}
// Only move onto the next position if the consumer on the current position was used.
if (redistributor == null && !exclusive && groupConsumer == null) {
pos++;
}
if (pos == size) {
pos = 0;
if (groupConsumer != null) {
consumer = groupConsumer;
}
HandleStatus status = handle(ref, consumer);
@ -3144,11 +3080,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deliveriesInTransit.countUp();
proceedDeliver(consumer, ref);
consumers.reset();
return true;
}
if (pos == startPos || redistributor != null || groupConsumer != null || exclusive) {
// Tried them all
if (redistributor != null || groupConsumer != null) {
break;
}
}
@ -3160,13 +3096,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private Consumer getGroupConsumer(SimpleString groupID) {
Consumer groupConsumer = null;
if (exclusive) {
// If exclusive is set, then this overrides the consumer chosen round-robin
groupConsumer = exclusiveConsumer;
} else {
// If a group id is set, then this overrides the consumer chosen round-robin
if (groupID != null) {
groupConsumer = groups.get(groupID);
}
}
return groupConsumer;
}
private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
if (groupID != null) {
if (exclusive) {
if (groupConsumer == null) {
exclusiveConsumer = consumer;
}
consumers.repeat();
} else if (groupID != null) {
if (extractGroupSequence(ref) == -1) {
groups.remove(groupID);
}
if (groupConsumer == null) {
consumers.repeat();
} else if (groupConsumer == null) {
groups.put(groupID, consumer);
} else {
consumers.repeat();
}
}
}
@ -3245,19 +3202,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return status;
}
private List<ConsumerHolder> cloneConsumersList() {
List<ConsumerHolder> consumerListClone;
synchronized (this) {
if (redistributor == null) {
consumerListClone = new ArrayList<>(consumerList);
} else {
consumerListClone = Collections.singletonList(redistributor);
}
}
return consumerListClone;
}
@Override
public void postAcknowledge(final MessageReference ref) {
QueueImpl queue = (QueueImpl) ref.getQueue();
@ -3407,7 +3351,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Inner classes
// --------------------------------------------------------------------------
protected static class ConsumerHolder<T extends Consumer> {
protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {
ConsumerHolder(final T consumer) {
this.consumer = consumer;
@ -3424,6 +3368,27 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
iter = null;
}
private Consumer consumer() {
return consumer;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConsumerHolder<?> that = (ConsumerHolder<?>) o;
return Objects.equals(consumer, that.consumer);
}
@Override
public int hashCode() {
return Objects.hash(consumer);
}
@Override
public int getPriority() {
return consumer.getPriority();
}
}
private class DelayedAddRedistributor implements Runnable {
@ -3745,19 +3710,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
Set<Consumer> consumersSet = getConsumers();
if (consumersSet.size() == 0) {
if (consumers.size() == 0) {
logger.debug("There are no consumers, no need to check slow consumer's rate");
return;
} else if (queueRate < (threshold * consumersSet.size())) {
} else if (queueRate < (threshold * consumers.size())) {
if (logger.isDebugEnabled()) {
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
}
return;
}
for (Consumer consumer : consumersSet) {
for (ConsumerHolder consumerHolder : consumers) {
Consumer consumer = consumerHolder.consumer();
if (consumer instanceof ServerConsumerImpl) {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
float consumerRate = serverConsumer.getRate();

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message;
@ -88,6 +89,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final Filter filter;
private final int priority;
private final int minLargeMessageSize;
private final ServerSession session;
@ -194,12 +197,32 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
final boolean supportLargeMessage,
final Integer credits,
final ActiveMQServer server) throws Exception {
this(id, session, binding, filter, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
}
public ServerConsumerImpl(final long id,
final ServerSession session,
final QueueBinding binding,
final Filter filter,
final int priority,
final boolean started,
final boolean browseOnly,
final StorageManager storageManager,
final SessionCallback callback,
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final ManagementService managementService,
final boolean supportLargeMessage,
final Integer credits,
final ActiveMQServer server) throws Exception {
this.id = id;
this.sequentialID = server.getStorageManager().generateID();
this.filter = filter;
this.priority = priority;
this.session = session;
this.binding = binding;
@ -501,6 +524,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return filter;
}
@Override
public int getPriority() {
return priority;
}
@Override
public SimpleString getFilterString() {
return filter == null ? null : filter.getFilterString();

View File

@ -35,6 +35,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@ -455,6 +456,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean browseOnly,
final boolean supportLargeMessage,
final Integer credits) throws Exception {
return this.createConsumer(consumerID, queueName, filterString, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), browseOnly, supportLargeMessage, credits);
}
@Override
public ServerConsumer createConsumer(final long consumerID,
final SimpleString queueName,
final SimpleString filterString,
final int priority,
final boolean browseOnly,
final boolean supportLargeMessage,
final Integer credits) throws Exception {
final SimpleString unPrefixedQueueName = removePrefix(queueName);
Binding binding = postOffice.getBinding(unPrefixedQueueName);
@ -485,7 +497,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
filterString, browseOnly, supportLargeMessage));
}
ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
consumers.put(consumer.getID(), consumer);
if (server.hasBrokerConsumerPlugins()) {

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.PriorityAware;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class QueueConsumersImplTest {
private QueueConsumers<TestPriority> queueConsumers;
@Before
public void setUp() {
queueConsumers = new QueueConsumersImpl<>();
}
@Test
public void addTest() {
TestPriority testPriority = new TestPriority("hello", 0);
assertFalse(queueConsumers.hasNext());
queueConsumers.add(testPriority);
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());
assertEquals(testPriority, queueConsumers.next());
}
@Test
public void removeTest() {
TestPriority testPriority = new TestPriority("hello", 0);
assertFalse(queueConsumers.hasNext());
queueConsumers.add(testPriority);
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());
queueConsumers.remove(testPriority);
queueConsumers.reset();
assertFalse(queueConsumers.hasNext());
assertEquals(0, queueConsumers.getPriorites().size());
queueConsumers.remove(testPriority);
queueConsumers.remove(testPriority);
}
@Test
public void roundRobinTest() {
queueConsumers.add(new TestPriority("A", 127));
queueConsumers.add(new TestPriority("B", 127));
queueConsumers.add(new TestPriority("E", 0));
queueConsumers.add(new TestPriority("D", 20));
queueConsumers.add(new TestPriority("C", 127));
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());
assertEquals("A", queueConsumers.next().getName());
//Reset iterator should mark start as current position
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());
assertEquals("B", queueConsumers.next().getName());
assertTrue(queueConsumers.hasNext());
assertEquals("C", queueConsumers.next().getName());
//Expect another A as after reset, we started at B so after A we then expect the next level
assertTrue(queueConsumers.hasNext());
assertEquals("A", queueConsumers.next().getName());
assertTrue(queueConsumers.hasNext());
assertEquals("D", queueConsumers.next().getName());
assertTrue(queueConsumers.hasNext());
assertEquals("E", queueConsumers.next().getName());
//We have iterated all.
assertFalse(queueConsumers.hasNext());
//Reset to iterate again.
queueConsumers.reset();
//We expect the iteration to round robin from last returned at the level.
assertTrue(queueConsumers.hasNext());
assertEquals("B", queueConsumers.next().getName());
}
private class TestPriority implements PriorityAware {
private final int priority;
private final String name;
private TestPriority(String name, int priority) {
this.priority = priority;
this.name = name;
}
@Override
public int getPriority() {
return priority;
}
public String getName() {
return name;
}
}
}

View File

@ -41,6 +41,7 @@
* [Last-Value Queues](last-value-queues.md)
* [Exclusive Queues](exclusive-queues.md)
* [Message Grouping](message-grouping.md)
* [Consumer Priority](consumer-priority.md)
* [Extra Acknowledge Modes](pre-acknowledge.md)
* [Management](management.md)
* [Management Console](management-console.md)

View File

@ -0,0 +1,45 @@
# Consumer Priority
Consumer priorities allow you to ensure that high priority consumers receive messages while they are active.
Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority.
Messages will only going to lower priority consumers when the high priority consumers do not have credit available to consume the message, or those high priority consumers have declined to accept the message (for instance because it does not meet the criteria of any selectors associated with the consumer).
Where a consumer does not set, the default priority <b>0</b> is used.
## Core
#### JMS Example
When using the JMS Client you can set the priority to be used, by using address parameters when
creating the destination used by the consumer.
```java
Queue queue = session.createQueue("my.destination.name?consmer-priority=50");
Topic topic = session.createTopic("my.destination.name?consmer-priority=50");
consumer = session.createConsumer(queue);
```
The range of priority values is -2<sup>31</sup> to 2<sup>31</sup>-1.
## OpenWire
####JMS Example
The priority for a consumer is set using Destination Options as follows:
```java
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);
```
Because of the limitation of OpenWire, the range of priority values is: 0 to 127. The highest priority is 127.
## AMQP
In AMQP 1.0 the priority of the consumer is set in the properties map of the attach frame where the broker side of the link represents the sending side of the link.
The key for the entry must be the literal string priority, and the value of the entry must be an integral number in the range -2<sup>31</sup> to 2<sup>31</sup>-1.

View File

@ -79,6 +79,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
private String selector;
private boolean presettle;
private boolean noLocal;
private Map<Symbol, Object> properties;
private AsyncResult pullRequest;
private AsyncResult stopRequest;
@ -173,6 +174,14 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
}
public void setProperties(Map<Symbol, Object> properties) {
if (getEndpoint() != null) {
throw new IllegalStateException("Endpoint already established");
}
this.properties = properties;
}
/**
* Detach the receiver, a closed receiver will throw exceptions if any further send calls are
* made.
@ -782,7 +791,9 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
} else {
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
}
if (properties != null) {
receiver.setProperties(properties);
}
setEndpoint(receiver);
super.doOpen();

View File

@ -306,6 +306,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return createReceiver(address, selector, noLocal, false);
}
public AmqpReceiver createReceiver(String address,
String selector,
boolean noLocal,
boolean presettle) throws Exception {
return createReceiver(address, selector, noLocal, presettle, null);
}
/**
* Create a receiver instance using the given address
*
@ -313,13 +319,15 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @param selector the JMS selector to use for the subscription
* @param noLocal should the subscription have messages from its connection filtered.
* @param presettle should the receiver be created with a settled sender mode.
* @param properties to set on the receiver
* @return a newly created receiver that is ready for use.
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(String address,
String selector,
boolean noLocal,
boolean presettle) throws Exception {
boolean presettle,
Map<Symbol, Object> properties) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
@ -330,6 +338,9 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
if (selector != null && !selector.isEmpty()) {
receiver.setSelector(selector);
}
if (properties != null && !properties.isEmpty()) {
receiver.setProperties(properties);
}
connection.getScheduler().execute(new Runnable() {

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Test various behaviors of AMQP receivers with the broker.
*/
public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
@Test(timeout = 30000)
public void testPriority() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Map<Symbol, Object> properties1 = new HashMap<>();
properties1.put(Symbol.getSymbol("priority"), 5);
AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
receiver1.flow(100);
Map<Symbol, Object> properties2 = new HashMap<>();
properties2.put(Symbol.getSymbol("priority"), 50);
AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2);
receiver2.flow(100);
Map<Symbol, Object> properties3 = new HashMap<>();
properties3.put(Symbol.getSymbol("priority"), 10);
AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3);
receiver3.flow(100);
sendMessages(getQueueName(), 5);
for (int i = 0; i < 5; i++) {
AmqpMessage message1 = receiver1.receiveNoWait();
AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
AmqpMessage message3 = receiver3.receiveNoWait();
assertNotNull("did not receive message first time", message2);
assertEquals("MessageID:" + i, message2.getMessageId());
message2.accept();
assertNull("message is not meant to goto lower priority receiver", message1);
assertNull("message is not meant to goto lower priority receiver", message3);
}
assertNoMessage(receiver1);
assertNoMessage(receiver3);
//Close the high priority receiver
receiver2.close();
sendMessages(getQueueName(), 5);
//Check messages now goto next priority receiver
for (int i = 0; i < 5; i++) {
AmqpMessage message1 = receiver1.receiveNoWait();
AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
assertNotNull("did not receive message first time", message3);
assertEquals("MessageID:" + i, message3.getMessageId());
message3.accept();
assertNull("message is not meant to goto lower priority receiver", message1);
}
assertNoMessage(receiver1);
connection.close();
}
public void assertNoMessage(AmqpReceiver receiver) throws Exception {
//A check to make sure no messages
AmqpMessage message = receiver.receive(250, TimeUnit.MILLISECONDS);
assertNull("message is not meant to goto lower priority receiver", message);
}
@Test(timeout = 30000)
public void testPriorityProvidedAsByte() throws Exception {
testPriorityNumber((byte) 5);
}
@Test(timeout = 30000)
public void testPriorityProvidedAsUnsignedInteger() throws Exception {
testPriorityNumber(UnsignedInteger.valueOf(5));
}
private void testPriorityNumber(Number number) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Map<Symbol, Object> properties1 = new HashMap<>();
properties1.put(Symbol.getSymbol("priority"), number);
AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
receiver1.flow(100);
sendMessages(getQueueName(), 2);
for (int i = 0; i < 2; i++) {
AmqpMessage message1 = receiver1.receiveNoWait();
assertNotNull("did not receive message first time", message1);
assertEquals("MessageID:" + i, message1.getMessageId());
message1.accept();
}
connection.close();
}
}

View File

@ -0,0 +1,297 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.client;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
/**
* Consumer Priority Test
*/
public class ConsumerPriorityTest extends JMSTestBase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
protected ConnectionFactory getCF() throws Exception {
return cf;
}
@Test
public void testConsumerPriorityQueueConsumerSettingUsingAddressQueueParameters() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
String queueName = getName();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
Queue queue1 = session.createQueue(queueName + "?consumer-priority=3");
Queue queue2 = session.createQueue(queueName + "?consumer-priority=2");
Queue queue3 = session.createQueue(queueName + "?consumer-priority=1");
assertEquals(queueName, queue.getQueueName());
ActiveMQDestination b = (ActiveMQDestination) queue1;
assertEquals(3, b.getQueueAttributes().getConsumerPriority().intValue());
ActiveMQDestination c = (ActiveMQDestination) queue2;
assertEquals(2, c.getQueueAttributes().getConsumerPriority().intValue());
ActiveMQDestination d = (ActiveMQDestination) queue3;
assertEquals(1, d.getQueueAttributes().getConsumerPriority().intValue());
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer1 = session.createConsumer(queue1);
MessageConsumer consumer2 = session.createConsumer(queue2);
MessageConsumer consumer3 = session.createConsumer(queue3);
connection.start();
for (int j = 0; j < 100; j++) {
TextMessage message = session.createTextMessage();
message.setText("Message" + j);
producer.send(message);
}
//All msgs should go to the first consumer
for (int j = 0; j < 100; j++) {
TextMessage tm = (TextMessage) consumer1.receive(10000);
assertNotNull(tm);
assertEquals("Message" + j, tm.getText());
tm = (TextMessage) consumer2.receiveNoWait();
assertNull(tm);
tm = (TextMessage) consumer3.receiveNoWait();
assertNull(tm);
}
} finally {
connection.close();
}
}
@Test
public void testConsumerPriorityQueueConsumerRoundRobin() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
String queueName = getName();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
Queue queue1 = session.createQueue(queueName + "?consumer-priority=3");
Queue queue2 = session.createQueue(queueName + "?consumer-priority=3");
Queue queue3 = session.createQueue(queueName + "?consumer-priority=1");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer1 = session.createConsumer(queue1);
MessageConsumer consumer2 = session.createConsumer(queue2);
MessageConsumer consumer3 = session.createConsumer(queue3);
connection.start();
for (int j = 0; j < 100; j++) {
TextMessage message = session.createTextMessage();
message.setText("Message" + j);
message.setIntProperty("counter", j);
producer.send(message);
}
//All msgs should go to the first two consumers, round robin'd
for (int j = 0; j < 50; j += 2) {
TextMessage tm = (TextMessage) consumer1.receive(10000);
assertNotNull(tm);
assertEquals("Message" + j, tm.getText());
TextMessage tm2 = (TextMessage) consumer2.receive(10000);
assertNotNull(tm2);
assertEquals("Message" + (j + 1), tm2.getText());
TextMessage tm3 = (TextMessage) consumer3.receiveNoWait();
assertNull(tm3);
}
} finally {
connection.close();
}
}
@Test
public void testConsumerPriorityQueueConsumerFailover() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
String queueName = getName();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
Queue queue1 = session.createQueue(queueName + "?consumer-priority=3");
Queue queue2 = session.createQueue(queueName + "?consumer-priority=2");
Queue queue3 = session.createQueue(queueName + "?consumer-priority=1");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer1 = session.createConsumer(queue1);
MessageConsumer consumer2 = session.createConsumer(queue2);
MessageConsumer consumer3 = session.createConsumer(queue3);
connection.start();
for (int j = 0; j < 100; j++) {
TextMessage message = session.createTextMessage();
message.setText("Message" + j);
producer.send(message);
}
//All msgs should go to the first consumer
for (int j = 0; j < 50; j++) {
TextMessage tm = (TextMessage) consumer1.receive(10000);
assertNotNull(tm);
assertEquals("Message" + j, tm.getText());
tm = (TextMessage) consumer2.receiveNoWait();
assertNull(tm);
tm = (TextMessage) consumer3.receiveNoWait();
assertNull(tm);
}
consumer1.close();
//All msgs should now go to the next consumer only, without any errors or exceptions
for (int j = 50; j < 100; j++) {
TextMessage tm = (TextMessage) consumer2.receive(10000);
assertNotNull(tm);
assertEquals("Message" + j, tm.getText());
tm = (TextMessage) consumer3.receiveNoWait();
assertNull(tm);
}
} finally {
connection.close();
}
}
@Test
public void testConsumerPriorityTopicSharedConsumerFailover() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
String topicName = getName();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
String subscriptionName = "sharedsub";
Topic topicConsumer1 = session.createTopic(topicName + "?consumer-priority=3");
Topic topicConsumer2 = session.createTopic(topicName + "?consumer-priority=2");
Topic topicConsumer3 = session.createTopic(topicName + "?consumer-priority=1");
MessageConsumer consumer1 = session.createSharedDurableConsumer(topicConsumer1, subscriptionName);
MessageConsumer consumer2 = session.createSharedDurableConsumer(topicConsumer2, subscriptionName);
MessageConsumer consumer3 = session.createSharedDurableConsumer(topicConsumer3, subscriptionName);
connection.start();
for (int j = 0; j < 100; j++) {
TextMessage message = session.createTextMessage();
message.setText("Message" + j);
producer.send(message);
}
//All msgs should go to the first consumer
for (int j = 0; j < 50; j++) {
TextMessage tm = (TextMessage) consumer1.receive(10000);
assertNotNull(tm);
assertEquals("Message" + j, tm.getText());
tm = (TextMessage) consumer2.receiveNoWait();
assertNull(tm);
tm = (TextMessage) consumer3.receiveNoWait();
assertNull(tm);
}
consumer1.close();
//All msgs should now go to the next consumer only, without any errors or exceptions
for (int j = 50; j < 100; j++) {
TextMessage tm = (TextMessage) consumer2.receive(10000);
assertNotNull(tm);
assertEquals("Message" + j, tm.getText());
tm = (TextMessage) consumer3.receiveNoWait();
assertNull(tm);
}
} finally {
connection.close();
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire.amq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Before;
import org.junit.Test;
public class QueueConsumerPriorityTest extends BasicOpenWireTest {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
this.makeSureCoreQueueExist("QUEUE.A");
}
@Test
public void testQueueConsumerPriority() throws JMSException, InterruptedException {
connection.start();
Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(consumerHighPriority);
Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "QUEUE.A";
ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1");
MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low);
ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2");
MessageConsumer highConsumer = consumerLowPriority.createConsumer(high);
ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
MessageProducer producer = senderSession.createProducer(senderQueue);
Message msg = senderSession.createTextMessage("test");
for (int i = 0; i < 1000; i++) {
producer.send(msg);
assertNotNull("null on iteration: " + i, highConsumer.receive(1000));
}
assertNull(lowConsumer.receive(250));
}
}

View File

@ -1174,6 +1174,11 @@ public class MessageHeaderTest extends MessageHeaderTestBase {
return null;
}
@Override
public ClientConsumer createConsumer(SimpleString queueName, SimpleString filter, int priority, boolean browseOnly) throws ActiveMQException {
return null;
}
@Override
public ClientConsumer createConsumer(final SimpleString queueName,
final SimpleString filterString,
@ -1183,6 +1188,11 @@ public class MessageHeaderTest extends MessageHeaderTestBase {
return null;
}
@Override
public ClientConsumer createConsumer(SimpleString queueName, SimpleString filter, int priority, int windowSize, int maxRate, boolean browseOnly) throws ActiveMQException {
return null;
}
@Override
public ClientConsumer createConsumer(final String queueName) throws ActiveMQException {
return null;

View File

@ -746,6 +746,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
return null;
}
@Override
public int getPriority() {
return 0;
}
public long getID() {
return 0;