mirror of https://github.com/apache/activemq.git
Refactor the iterator implementation in the PrioritizedPendingList to not copy elements and instead use the level iterators. Add some additional tests.
This commit is contained in:
parent
485fcafcdf
commit
c1b58d3373
|
@ -16,20 +16,19 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.region.cursors;
|
package org.apache.activemq.broker.region.cursors;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import static org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues;
|
||||||
|
|
||||||
|
import java.util.ArrayDeque;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Deque;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.activemq.broker.region.MessageReference;
|
import org.apache.activemq.broker.region.MessageReference;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.management.SizeStatisticImpl;
|
import org.apache.activemq.management.SizeStatisticImpl;
|
||||||
|
|
||||||
|
|
||||||
import static org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues;
|
|
||||||
|
|
||||||
public class PrioritizedPendingList implements PendingList {
|
public class PrioritizedPendingList implements PendingList {
|
||||||
|
|
||||||
private static final Integer MAX_PRIORITY = 10;
|
private static final Integer MAX_PRIORITY = 10;
|
||||||
|
@ -121,38 +120,58 @@ public class PrioritizedPendingList implements PendingList {
|
||||||
return lists[getPriority(msg)];
|
return lists[getPriority(msg)];
|
||||||
}
|
}
|
||||||
|
|
||||||
private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
|
private final class PrioritizedPendingListIterator implements Iterator<MessageReference> {
|
||||||
private int index = 0;
|
|
||||||
private int currentIndex = 0;
|
private final Deque<Iterator<MessageReference>> iterators = new ArrayDeque<Iterator<MessageReference>>();
|
||||||
List<PendingNode> list = new ArrayList<PendingNode>(size());
|
|
||||||
|
private Iterator<MessageReference> current;
|
||||||
|
private MessageReference currentMessage;
|
||||||
|
|
||||||
PrioritizedPendingListIterator() {
|
PrioritizedPendingListIterator() {
|
||||||
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
|
for (OrderedPendingList list : lists) {
|
||||||
OrderedPendingList orderedPendingList = lists[i];
|
if (!list.isEmpty()) {
|
||||||
if (!orderedPendingList.isEmpty()) {
|
iterators.push(list.iterator());
|
||||||
list.addAll(orderedPendingList.getAsList());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
current = iterators.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
return list.size() > index;
|
while (current != null) {
|
||||||
|
if (current.hasNext()) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
current = iterators.poll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageReference next() {
|
public MessageReference next() {
|
||||||
PendingNode node = list.get(this.index);
|
MessageReference result = null;
|
||||||
this.currentIndex = this.index;
|
|
||||||
this.index++;
|
while (current != null) {
|
||||||
return node.getMessage();
|
if (current.hasNext()) {
|
||||||
|
result = currentMessage = current.next();
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
current = iterators.poll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void remove() {
|
public void remove() {
|
||||||
PendingNode node = list.get(this.currentIndex);
|
if (currentMessage != null) {
|
||||||
if (node != null) {
|
pendingMessageHelper.removeFromMap(currentMessage);
|
||||||
pendingMessageHelper.removeFromMap(node.getMessage());
|
current.remove();
|
||||||
node.getList().removeNode(node);
|
currentMessage = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,11 +14,11 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.activemq.broker.region.cursors;
|
package org.apache.activemq.broker.region.cursors;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -187,7 +187,9 @@ public class PrioritizedPendingListTest {
|
||||||
Iterator<MessageReference> iter = list.iterator();
|
Iterator<MessageReference> iter = list.iterator();
|
||||||
int lastId = list.size();
|
int lastId = list.size();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
assertEquals(lastId--, iter.next().getMessage().getPriority());
|
MessageReference nextMessage = iter.next();
|
||||||
|
assertNotNull(nextMessage);
|
||||||
|
assertEquals(lastId--, nextMessage.getMessage().getPriority());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,6 +217,30 @@ public class PrioritizedPendingListTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFullRangeIteration() {
|
||||||
|
PrioritizedPendingList list = new PrioritizedPendingList();
|
||||||
|
|
||||||
|
int totalElements = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
list.addMessageFirst(new TestMessageReference(totalElements++, i));
|
||||||
|
list.addMessageFirst(new TestMessageReference(totalElements++, i));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(list.size() == totalElements);
|
||||||
|
|
||||||
|
int totalIterated = 0;
|
||||||
|
Iterator<MessageReference> iter = list.iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
MessageReference nextMessage = iter.next();
|
||||||
|
assertNotNull(nextMessage);
|
||||||
|
totalIterated++;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(totalElements, totalIterated);
|
||||||
|
}
|
||||||
|
|
||||||
static class TestMessageReference implements MessageReference {
|
static class TestMessageReference implements MessageReference {
|
||||||
|
|
||||||
private static final IdGenerator id = new IdGenerator();
|
private static final IdGenerator id = new IdGenerator();
|
||||||
|
|
Loading…
Reference in New Issue