mirror of https://github.com/apache/activemq.git
Do the inital recovery dispatch in the iterate() thread so that the addSubscription() operation on the Queue executes quickly.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@638924 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
aa2c8965eb
commit
5269519536
|
@ -265,7 +265,7 @@ public abstract class AbstractRegion implements Region {
|
|||
}
|
||||
|
||||
if (info.isBrowser()) {
|
||||
((QueueBrowserSubscription)sub).browseDone();
|
||||
((QueueBrowserSubscription)sub).destinationsAdded();
|
||||
}
|
||||
|
||||
return sub;
|
||||
|
|
|
@ -99,6 +99,7 @@ public class Queue extends BaseDestination implements Task {
|
|||
wakeup();
|
||||
}
|
||||
};
|
||||
|
||||
private static final Comparator<Subscription>orderedCompare = new Comparator<Subscription>() {
|
||||
|
||||
public int compare(Subscription s1, Subscription s2) {
|
||||
|
@ -195,14 +196,19 @@ public class Queue extends BaseDestination implements Task {
|
|||
}
|
||||
}
|
||||
|
||||
class RecoveryDispatch {
|
||||
public ArrayList<QueueMessageReference> messages;
|
||||
public Subscription subscription;
|
||||
}
|
||||
|
||||
LinkedList<RecoveryDispatch> recoveries = new LinkedList<RecoveryDispatch>();
|
||||
|
||||
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
|
||||
dispatchLock.lock();
|
||||
try {
|
||||
sub.add(context, this);
|
||||
destinationStatistics.getConsumers().increment();
|
||||
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
|
||||
// MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
|
||||
|
||||
// needs to be synchronized - so no contention with dispatching
|
||||
synchronized (consumers) {
|
||||
|
@ -223,21 +229,33 @@ public class Queue extends BaseDestination implements Task {
|
|||
// duplicates
|
||||
// etc.
|
||||
doPageIn(false);
|
||||
msgContext.setDestination(destination);
|
||||
// msgContext.setDestination(destination);
|
||||
|
||||
synchronized (pagedInMessages) {
|
||||
// Add all the matching messages in the queue to the
|
||||
// subscription.
|
||||
|
||||
for (QueueMessageReference node:pagedInMessages.values()){
|
||||
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
|
||||
msgContext.setMessageReference(node);
|
||||
if (sub.matches(node, msgContext)) {
|
||||
sub.add(node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RecoveryDispatch rd = new RecoveryDispatch();
|
||||
rd.messages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
|
||||
rd.subscription = sub;
|
||||
recoveries.addLast(rd);
|
||||
}
|
||||
|
||||
if( sub instanceof QueueBrowserSubscription ) {
|
||||
((QueueBrowserSubscription)sub).incrementQueueRef();
|
||||
}
|
||||
|
||||
// System.out.println(new Date()+": Locked pagedInMessages: "+sub.getConsumerInfo().getConsumerId());
|
||||
// // Add all the matching messages in the queue to the
|
||||
// // subscription.
|
||||
//
|
||||
// for (QueueMessageReference node:pagedInMessages.values()){
|
||||
// if (!node.isDropped() && !node.isAcked() && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
|
||||
// msgContext.setMessageReference(node);
|
||||
// if (sub.matches(node, msgContext)) {
|
||||
// sub.add(node);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// }
|
||||
wakeup();
|
||||
}finally {
|
||||
dispatchLock.unlock();
|
||||
|
@ -880,16 +898,56 @@ public class Queue extends BaseDestination implements Task {
|
|||
} while (count < this.destinationStatistics.getMessages().getCount());
|
||||
return movedCounter;
|
||||
}
|
||||
|
||||
RecoveryDispatch getNextRecoveryDispatch() {
|
||||
synchronized (pagedInMessages) {
|
||||
if( recoveries.isEmpty() ) {
|
||||
return null;
|
||||
}
|
||||
return recoveries.removeFirst();
|
||||
}
|
||||
|
||||
}
|
||||
protected boolean isRecoveryDispatchEmpty() {
|
||||
synchronized (pagedInMessages) {
|
||||
return recoveries.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if we would like to iterate again
|
||||
* @see org.apache.activemq.thread.Task#iterate()
|
||||
*/
|
||||
public boolean iterate() {
|
||||
|
||||
RecoveryDispatch rd;
|
||||
while ((rd = getNextRecoveryDispatch()) != null) {
|
||||
try {
|
||||
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
|
||||
msgContext.setDestination(destination);
|
||||
|
||||
for (QueueMessageReference node : rd.messages) {
|
||||
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
|
||||
msgContext.setMessageReference(node);
|
||||
if (rd.subscription.matches(node, msgContext)) {
|
||||
rd.subscription.add(node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if( rd.subscription instanceof QueueBrowserSubscription ) {
|
||||
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
boolean result = false;
|
||||
synchronized (messages) {
|
||||
result = !messages.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
if (result) {
|
||||
try {
|
||||
pageInMessages(false);
|
||||
|
@ -1027,46 +1085,53 @@ public class Queue extends BaseDestination implements Task {
|
|||
|
||||
private void doDispatch(List<QueueMessageReference> list) throws Exception {
|
||||
if (list != null) {
|
||||
synchronized (consumers) {
|
||||
for (MessageReference node : list) {
|
||||
Subscription target = null;
|
||||
List<Subscription> targets = null;
|
||||
for (Subscription s : consumers) {
|
||||
if (dispatchSelector.canSelect(s, node)) {
|
||||
if (!s.isFull()) {
|
||||
s.add(node);
|
||||
node.incrementReferenceCount();
|
||||
target = s;
|
||||
break;
|
||||
} else {
|
||||
if (targets == null) {
|
||||
targets = new ArrayList<Subscription>();
|
||||
}
|
||||
targets.add(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (target == null && targets != null) {
|
||||
// pick the least loaded to add the message too
|
||||
for (Subscription s : targets) {
|
||||
if (target == null
|
||||
|| target.getInFlightUsage() > s
|
||||
.getInFlightUsage()) {
|
||||
target = s;
|
||||
}
|
||||
}
|
||||
if (target != null) {
|
||||
target.add(node);
|
||||
List<Subscription> consumers;
|
||||
synchronized (this.consumers) {
|
||||
consumers = new ArrayList<Subscription>(this.consumers);
|
||||
}
|
||||
|
||||
for (MessageReference node : list) {
|
||||
Subscription target = null;
|
||||
List<Subscription> targets = null;
|
||||
for (Subscription s : consumers) {
|
||||
if (dispatchSelector.canSelect(s, node)) {
|
||||
if (!s.isFull()) {
|
||||
s.add(node);
|
||||
node.incrementReferenceCount();
|
||||
target = s;
|
||||
break;
|
||||
} else {
|
||||
if (targets == null) {
|
||||
targets = new ArrayList<Subscription>();
|
||||
}
|
||||
targets.add(s);
|
||||
}
|
||||
}
|
||||
if (target != null && !strictOrderDispatch && consumers.size() > 1 &&
|
||||
!dispatchSelector.isExclusiveConsumer(target)) {
|
||||
removeFromConsumerList(target);
|
||||
addToConsumerList(target);
|
||||
}
|
||||
|
||||
}
|
||||
if (target == null && targets != null) {
|
||||
// pick the least loaded to add the message too
|
||||
for (Subscription s : targets) {
|
||||
if (target == null
|
||||
|| target.getInFlightUsage() > s
|
||||
.getInFlightUsage()) {
|
||||
target = s;
|
||||
}
|
||||
}
|
||||
if (target != null) {
|
||||
target.add(node);
|
||||
node.incrementReferenceCount();
|
||||
}
|
||||
}
|
||||
if (target != null && !strictOrderDispatch && consumers.size() > 1 &&
|
||||
!dispatchSelector.isExclusiveConsumer(target)) {
|
||||
synchronized (this.consumers) {
|
||||
if( removeFromConsumerList(target) ) {
|
||||
addToConsumerList(target);
|
||||
consumers = new ArrayList<Subscription>(this.consumers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1094,8 +1159,8 @@ public class Queue extends BaseDestination implements Task {
|
|||
}
|
||||
}
|
||||
|
||||
private void removeFromConsumerList(Subscription sub) {
|
||||
consumers.remove(sub);
|
||||
private boolean removeFromConsumerList(Subscription sub) {
|
||||
return consumers.remove(sub);
|
||||
}
|
||||
|
||||
private int getConsumerMessageCountBeforeFull() throws Exception {
|
||||
|
|
|
@ -29,7 +29,9 @@ import org.apache.activemq.usage.SystemUsage;
|
|||
|
||||
public class QueueBrowserSubscription extends QueueSubscription {
|
||||
|
||||
int queueRefs;
|
||||
boolean browseDone;
|
||||
boolean destinationsAdded;
|
||||
|
||||
public QueueBrowserSubscription(Broker broker,Destination destination, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
|
||||
throws InvalidSelectorException {
|
||||
|
@ -46,9 +48,16 @@ public class QueueBrowserSubscription extends QueueSubscription {
|
|||
+ this.prefetchExtension + ", pending=" + getPendingQueueSize();
|
||||
}
|
||||
|
||||
public void browseDone() throws Exception {
|
||||
browseDone = true;
|
||||
add(QueueMessageReference.NULL_MESSAGE);
|
||||
synchronized public void destinationsAdded() throws Exception {
|
||||
destinationsAdded = true;
|
||||
checkDone();
|
||||
}
|
||||
|
||||
private void checkDone() throws Exception {
|
||||
if( !browseDone && queueRefs == 0 && destinationsAdded) {
|
||||
browseDone=true;
|
||||
add(QueueMessageReference.NULL_MESSAGE);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
|
||||
|
@ -62,4 +71,13 @@ public class QueueBrowserSubscription extends QueueSubscription {
|
|||
throws IOException {
|
||||
}
|
||||
|
||||
synchronized public void incrementQueueRef() {
|
||||
queueRefs++;
|
||||
}
|
||||
|
||||
synchronized public void decrementQueueRef() throws Exception {
|
||||
queueRefs--;
|
||||
checkDone();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ public class TempQueue extends Queue{
|
|||
log.error("Failed to page in more queue messages ", e);
|
||||
}
|
||||
}
|
||||
if (!messagesWaitingForSpace.isEmpty()) {
|
||||
if (!messagesWaitingForSpace.isEmpty() || !isRecoveryDispatchEmpty()) {
|
||||
try {
|
||||
taskRunner.wakeup();
|
||||
} catch (InterruptedException e) {
|
||||
|
|
Loading…
Reference in New Issue