git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@822811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-10-07 17:41:21 +00:00
parent 27183f8cc8
commit 8e59e24905
4 changed files with 147 additions and 42 deletions

View File

@ -358,19 +358,11 @@ public abstract class AbstractRegion implements Region {
Subscription sub = consumerExchange.getSubscription();
if (sub == null) {
sub = subscriptions.get(ack.getConsumerId());
if (sub == null) {
//networked subscriptions are going to acknowledge in flight messages
//on behalf a subscription that is no more ...
if (!consumerExchange.getConnectionContext().isNetworkConnection()
&& !consumerExchange.getConnectionContext()
.isInRecoveryMode()) {
LOG.warn("Ack for non existent subscription, ack:" + ack);
throw new IllegalArgumentException(
"The subscription does not exist: "
+ ack.getConsumerId());
} else {
return;
}
}
consumerExchange.setSubscription(sub);
}

View File

@ -623,13 +623,24 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
protected void removeSubscription(DemandSubscription sub) throws IOException {
protected void removeSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
}
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses
ASYNC_TASKS.execute(new Runnable() {
public void run() {
sub.waitForCompletion();
try {
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
} catch (IOException e) {
LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
}
}
});
}
}
@ -652,9 +663,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (command.isMessageDispatch()) {
enqueueCounter.incrementAndGet();
final MessageDispatch md = (MessageDispatch)command;
DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage()!=null) {
// See if this consumer's brokerPath tells us it came from the broker at the other end
// of the bridge. I think we should be making this decision based on the message's
// broker bread crumbs and not the consumer's? However, the message's broker bread
@ -703,14 +713,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet();
}
} catch (IOException e) {
serviceLocalException(e);
} finally {
sub.decrementOutstandingResponses();
}
}
};
remoteBroker.asyncRequest(message, callback);
sub.incrementOutstandingResponses();
}
} else {

View File

@ -18,10 +18,13 @@ package org.apache.activemq.network;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Represents a network bridge interface
@ -29,10 +32,13 @@ import org.apache.activemq.command.ConsumerInfo;
* @version $Revision: 1.1 $
*/
public class DemandSubscription {
private static final Log LOG = LogFactory.getLog(DemandSubscription.class);
private final ConsumerInfo remoteInfo;
private final ConsumerInfo localInfo;
private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
private AtomicInteger dispatched = new AtomicInteger(0);
private AtomicBoolean activeWaiter = new AtomicBoolean();
DemandSubscription(ConsumerInfo info) {
remoteInfo = info;
@ -68,27 +74,6 @@ public class DemandSubscription {
return remoteSubsIds.isEmpty();
}
/**
* @return Returns the dispatched.
*/
public int getDispatched() {
return dispatched.get();
}
/**
* @param dispatched The dispatched to set.
*/
public void setDispatched(int dispatched) {
this.dispatched.set(dispatched);
}
/**
* @return dispatched count after incremented
*/
public int incrementDispatched() {
return dispatched.incrementAndGet();
}
/**
* @return Returns the localInfo.
*/
@ -103,4 +88,36 @@ public class DemandSubscription {
public ConsumerInfo getRemoteInfo() {
return remoteInfo;
}
public void waitForCompletion() {
if (dispatched.get() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get());
}
activeWaiter.set(true);
if (dispatched.get() > 0) {
synchronized (activeWaiter) {
try {
activeWaiter.wait();
} catch (InterruptedException ignored) {
}
}
if (this.dispatched.get() > 0) {
LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried");
}
}
}
}
public void decrementOutstandingResponses() {
if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
synchronized(activeWaiter) {
activeWaiter.notifyAll();
}
}
}
public void incrementOutstandingResponses() {
dispatched.incrementAndGet();
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.jmx.BrokerView;
public class AMQ2439Test extends JmsMultipleBrokersTestSupport {
Destination dest;
public void testDuplicatesThroughNetwork() throws Exception {
assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500));
assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500));
validateQueueStats();
}
private void validateQueueStats() throws Exception {
BrokerView brokerView = brokers.get("BrokerA").broker.getAdminView();
assertEquals("enequeue is correct", 1000, brokerView.getTotalEnqueueCount());
assertEquals("dequeue is correct", 1000, brokerView.getTotalDequeueCount());
}
protected int receiveExactMessages(String brokerName, int msgCount) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
Connection connection = brokerItem.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(dest);
Message msg;
int i;
for (i = 0; i < msgCount; i++) {
msg = consumer.receive(1000);
if (msg == null) {
break;
}
}
connection.close();
brokerItem.connections.remove(connection);
return i;
}
public void setUp() throws Exception {
super.setUp();
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=true&deleteAllMessagesOnStartup=true&advisorySupport=false"));
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=true&deleteAllMessagesOnStartup=true&useJmx=false"));
bridgeBrokers("BrokerA", "BrokerB");
startAllBrokers();
// Create queue
dest = createDestination("TEST.FOO", false);
sendMessages("BrokerA", dest, 1000);
}
}