In the queue case, when a consumer was closed it was not properly re-delivering messages to other available consumers. This was causing message to look like they got dropped.

- When we shut a queue sub down we now get it's pending+dispatched list and re-dispatch that to the other available subscriptions.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@640890 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-25 16:45:00 +00:00
parent f9585fd6ab
commit ae7de6c236
9 changed files with 60 additions and 22 deletions

View File

@ -17,6 +17,9 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException;
@ -104,8 +107,9 @@ public abstract class AbstractSubscription implements Subscription {
destinations.add(destination);
}
public void remove(ConnectionContext context, Destination destination) throws Exception {
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
destinations.remove(destination);
return Collections.EMPTY_LIST;
}
public ConsumerInfo getConsumerInfo() {

View File

@ -436,11 +436,18 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
public void remove(ConnectionContext context, Destination destination) throws Exception {
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
pending.remove(context, destination);
for (MessageReference r : dispatched) {
if( r.getRegionDestination() == destination ) {
rc.add((QueueMessageReference)r);
}
}
rc.addAll(pending.remove(context, destination));
}
return rc;
}
protected void dispatchPending() throws IOException {

View File

@ -292,25 +292,20 @@ public class Queue extends BaseDestination implements Task {
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups = getMessageGroupOwners()
.removeConsumer(consumerId);
// redeliver inflight messages
sub.remove(context, this);
List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
List<QueueMessageReference> inFlight = null;
synchronized(pagedInMessages) {
inFlight = new ArrayList<QueueMessageReference>(pagedInMessages.values());
for (MessageReference ref : sub.remove(context, this)) {
QueueMessageReference qmr = (QueueMessageReference)ref;
qmr.incrementRedeliveryCounter();
if( qmr.getLockOwner()==sub ) {
qmr.unlock();
qmr.incrementRedeliveryCounter();
}
list.add(qmr);
}
for (QueueMessageReference node:inFlight){
if (!node.isDropped() && !node.isAcked()
&& node.getLockOwner() == sub) {
if (node.unlock()) {
node.incrementRedeliveryCounter();
list.add(node);
}
}
}
if (list != null && !consumers.isEmpty()) {
if (!list.isEmpty() && !consumers.isEmpty()) {
doDispatch(list);
}
}
@ -938,6 +933,7 @@ public class Queue extends BaseDestination implements Task {
if( rd.subscription instanceof QueueBrowserSubscription ) {
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
}
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -87,8 +87,9 @@ public interface Subscription extends SubscriptionRecovery {
* The subscription will be no longer be receiving messages from the destination.
* @param context
* @param destination
* @return a list of un-acked messages that were added to the subscription.
*/
void remove(ConnectionContext context, Destination destination) throws Exception;
List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
/**
* The ConsumerInfo object that created the subscription.

View File

@ -16,12 +16,15 @@
*/
package org.apache.activemq.broker.region.cursors;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
@ -59,7 +62,9 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
public void add(ConnectionContext context, Destination destination) throws Exception {
}
public void remove(ConnectionContext context, Destination destination) throws Exception {
@SuppressWarnings("unchecked")
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
return Collections.EMPTY_LIST;
}
public boolean isRecoveryRequired() {

View File

@ -110,6 +110,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
return isDiskListEmpty();
}
/**
* reset the cursor

View File

@ -18,12 +18,14 @@ package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
@ -51,7 +53,7 @@ public interface PendingMessageCursor extends Service {
* @param destination
* @throws Exception
*/
void remove(ConnectionContext context, Destination destination) throws Exception;
List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
/**
* @return true if there are no pending messages

View File

@ -17,9 +17,11 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.advisory.AdvisorySupport;
@ -27,6 +29,7 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
@ -128,11 +131,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* @param destination
* @throws Exception
*/
public synchronized void remove(ConnectionContext context, Destination destination) throws Exception {
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
Object tsp = topics.remove(destination);
if (tsp != null) {
storePrefetches.remove(tsp);
}
return Collections.EMPTY_LIST;
}
/**

View File

@ -16,8 +16,13 @@
*/
package org.apache.activemq.broker.region.cursors;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
@ -32,6 +37,18 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
private Iterator<MessageReference> iter;
private MessageReference last;
@Override
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>();
for (MessageReference r : list) {
if( r.getRegionDestination()==destination ) {
rc.add(r);
}
}
return rc ;
}
/**
* @return true if there are no pending messages
*/