https://issues.apache.org/jira/browse/AMQ-3694 - Blocked/Slow advisory consumers in duplex network connector, eventually breaks request/reply with temps. The duplex case was not acking advisory messages, so we were limited to 750! Also revisit association of producer created temp with connection as this can still get deleted before an advisory. Solution is to let gc pick up temps created in this way, https://issues.apache.org/jira/browse/AMQ-2571. Resolve contention on destination creation for producer/advisory race condition. Additional tests

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1239188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-02-01 16:11:17 +00:00
parent 91059de283
commit 179d95e7be
9 changed files with 141 additions and 36 deletions

View File

@ -125,15 +125,15 @@ public abstract class AbstractRegion implements Region {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
boolean createIfTemporary) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
}
destinationsLock.writeLock().lock();
try {
Destination dest = destinations.get(destination);
if (dest == null) {
if (destination.isTemporary() == false || createIfTemporary) {
if (LOG.isDebugEnabled()) {
LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
}
dest = createDestination(context, destination);
// intercept if there is a valid interceptor defined
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
@ -222,7 +222,7 @@ public abstract class AbstractRegion implements Region {
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Destination doesn't exist: " + dest);
LOG.debug("Cannot remove a destination that doesn't exist: " + destination);
}
}
} finally {

View File

@ -65,6 +65,7 @@ import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
@ -574,6 +575,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
message.setRegionDestination(this);
ProducerState state = producerExchange.getProducerState();
if (state == null) {
LOG.warn("Send failed for: " + message + ", missing producer state for: " + producerExchange);
throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
}
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
@ -1664,8 +1670,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}finally {
consumersLock.readLock().unlock();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Message " + msg.getMessageId() + " sent to " + this.destination);
if (LOG.isDebugEnabled()) {
LOG.debug(broker.getBrokerName() + " Message " + msg.getMessageId() + " sent to " + this.destination);
}
wakeup();
}

View File

@ -288,7 +288,7 @@ public class RegionBroker extends EmptyBroker {
}
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
Destination answer;
@ -297,6 +297,12 @@ public class RegionBroker extends EmptyBroker {
return answer;
}
synchronized (destinations) {
answer = destinations.get(destination);
if (answer != null) {
return answer;
}
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
answer = queueRegion.addDestination(context, destination,true);
@ -305,10 +311,10 @@ public class RegionBroker extends EmptyBroker {
answer = topicRegion.addDestination(context, destination,true);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
answer = tempQueueRegion.addDestination(context, destination,create);
answer = tempQueueRegion.addDestination(context, destination, createIfTemp);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
answer = tempTopicRegion.addDestination(context, destination,create);
answer = tempTopicRegion.addDestination(context, destination, createIfTemp);
break;
default:
throw createUnknownDestinationTypeException(destination);
@ -316,6 +322,7 @@ public class RegionBroker extends EmptyBroker {
destinations.put(destination, answer);
return answer;
}
}
@ -374,22 +381,9 @@ public class RegionBroker extends EmptyBroker {
if (destination != null) {
inactiveDestinationsPurgeLock.readLock().lock();
try {
if (!destinations.containsKey(destination)) {
// This seems to cause the destination to be added but without
// advisories firing...
context.getBroker().addDestination(context, destination, true);
// associate it with the connection so that it can get deleted
if (destination.isTemporary() && context.getConnectionState() != null) {
DestinationInfo destinationInfo = new DestinationInfo(context.getConnectionId(),
DestinationInfo.ADD_OPERATION_TYPE,
destination);
context.getConnectionState().addTempDestination(destinationInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("assigning ownership of auto created temp : " + destination + " to connection:"
+ context.getConnectionId());
}
}
}
// This seems to cause the destination to be added but without
// advisories firing...
context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info);

View File

@ -101,6 +101,7 @@ public class TopicSubscription extends AbstractSubscription {
} else {
//we are slow
if(!isSlowConsumer()) {
LOG.warn(toString() + ": has reached its prefetch limit without an ack, it appears to be slow");
setSlowConsumer(true);
for (Destination dest: destinations) {
dest.slowConsumer(getContext(), this);

View File

@ -86,6 +86,13 @@ public class MessageAck extends BaseCommand {
this.messageCount = messageCount;
}
public MessageAck(Message message, byte ackType, int messageCount) {
this.ackType = ackType;
this.destination = message.getDestination();
this.lastMessageId = message.getMessageId();
this.messageCount = messageCount;
}
public void copy(MessageAck copy) {
super.copy(copy);
copy.firstMessageId = firstMessageId;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.network;
import javax.jms.JMSException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@ -368,11 +369,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
waitStarted();
MessageDispatch md = (MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
demandConsumerDispatched++;
if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
demandConsumerDispatched = 0;
}
ackAdvisory(md.getMessage());
} else if (command.isBrokerInfo()) {
lastConnectSucceeded.set(true);
remoteBrokerInfo = (BrokerInfo) command;
@ -411,6 +408,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
|| AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
ackAdvisory(message);
} else {
if (!isPermissableDestination(message.getDestination(), true)) {
return;
@ -431,6 +429,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
case ProducerInfo.DATA_STRUCTURE_TYPE:
localBroker.oneway(command);
break;
case MessageAck.DATA_STRUCTURE_TYPE:
MessageAck ack = (MessageAck) command;
DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
if (localSub != null) {
ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
localBroker.oneway(ack);
} else {
LOG.warn("Matching local subscription not found for ack: " + ack);
}
break;
case ConsumerInfo.DATA_STRUCTURE_TYPE:
localStartedLatch.await();
if (started.get()) {
@ -480,6 +488,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
private void ackAdvisory(Message message) throws IOException {
demandConsumerDispatched++;
if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
ack.setConsumerId(demandConsumerInfo.getConsumerId());
remoteBroker.oneway(ack);
demandConsumerDispatched = 0;
}
}
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
final int networkTTL = configuration.getNetworkTTL();
if (data.getClass() == ConsumerInfo.class) {
@ -520,7 +538,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
synchronized (brokerService.getVmConnectorURI()) {
if (addConsumerInfo(info)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
}
} else {
if (LOG.isDebugEnabled()) {
@ -555,7 +573,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() +" bridging destination control command: " + destInfo);
LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
}
localBroker.oneway(destInfo);
} else if (data.getClass() == RemoveInfo.class) {

View File

@ -114,6 +114,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
uri = "static:(failover:(" + remoteURI + "))";
}
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName("to-" + remoteBroker.getBrokerName());
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
connector.setConduitSubscriptions(conduit);

View File

@ -70,11 +70,12 @@ public class RequestReplyNoAdvisoryNetworkTest extends JmsMultipleBrokersTestSup
" http://activemq.apache.org/schema/core" +
" http://activemq.apache.org/schema/core/activemq-core.xsd\">" +
" <broker xmlns=\"http://activemq.apache.org/schema/core\" id=\"broker\"" +
" allowTempAutoCreationOnSend=\"true\" schedulePeriodForDestinationPurge=\"1000\"" +
" brokerName=\"%HOST%\" persistent=\"false\" advisorySupport=\"false\" useJmx=\"false\" >" +
" <destinationPolicy>" +
" <policyMap>" +
" <policyEntries>" +
" <policyEntry optimizedDispatch=\"true\">"+
" <policyEntry optimizedDispatch=\"true\" gcInactiveDestinations=\"true\" gcWithNetworkConsumers=\"true\" inactiveTimoutBeforeGC=\"1000\">"+
" <destination>"+
" <tempQueue physicalName=\"" + replyQWildcard.getPhysicalName() + "\"/>" +
" </destination>" +
@ -260,13 +261,14 @@ public class RequestReplyNoAdvisoryNetworkTest extends JmsMultipleBrokersTestSup
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setSchedulePeriodForDestinationPurge(1000);
broker.setAllowTempAutoCreationOnSend(true);
PolicyMap map = new PolicyMap();
PolicyEntry tempReplyQPolicy = new PolicyEntry();
tempReplyQPolicy.setOptimizedDispatch(true);
tempReplyQPolicy.setGcInactiveDestinations(true);
tempReplyQPolicy.setGcWithNetworkConsumers(true);
tempReplyQPolicy.setInactiveTimoutBeforeGC(10*1000);
tempReplyQPolicy.setInactiveTimoutBeforeGC(1000);
map.put(replyQWildcard, tempReplyQPolicy);
broker.setDestinationPolicy(map);

View File

@ -36,11 +36,21 @@ import java.net.URI;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,6 +71,9 @@ public class TwoBrokerTempQueueAdvisoryTest extends JmsMultipleBrokersTestSuppor
public void testTemporaryQueueAdvisory() throws Exception {
LOG.info("Running testTemporaryQueueAdvisory()");
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerA");
startAllBrokers();
waitForBridgeFormation();
waitForMinTopicRegionConsumerCount("BrokerB", 1);
@ -93,6 +106,68 @@ public class TwoBrokerTempQueueAdvisoryTest extends JmsMultipleBrokersTestSuppor
}));
}
public boolean useDuplex = true;
public void initCombosForTestSendToRemovedTemp() {
addCombinationValues("useDuplex", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
}
public void testSendToRemovedTemp() throws Exception {
ActiveMQQueue requestReplyDest = new ActiveMQQueue("RequestReply");
NetworkConnector nc = bridgeBrokers("BrokerA", "BrokerB");
if (useDuplex) {
nc.setDuplex(true);
} else {
bridgeBrokers("BrokerB", "BrokerA");
}
// destination advisory can loose the race with message dispatch, so we need to allow replies on network broker
// to work in the absence of an advisory, the destination will be cleaned up in the normal
// way
if (!useDuplex) {
brokers.get("BrokerB").broker.setAllowTempAutoCreationOnSend(true);
}
TransportConnector forClient = brokers.get("BrokerA").broker.addConnector("tcp://localhost:0");
startAllBrokers();
waitForBridgeFormation();
waitForMinTopicRegionConsumerCount("BrokerB", 1);
waitForMinTopicRegionConsumerCount("BrokerA", 1);
ConnectionFactory factory = new ActiveMQConnectionFactory(forClient.getConnectUri());
ActiveMQConnection conn = (ActiveMQConnection) factory.createConnection();
conn.setWatchTopicAdvisories(false);
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ConnectionFactory replyFactory = getConnectionFactory("BrokerB");
for (int i = 0; i < 500; i++) {
TemporaryQueue tempDest = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(requestReplyDest);
javax.jms.Message message = session.createTextMessage("req-" + i);
message.setJMSReplyTo(tempDest);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(tempDest);
producer.send(message);
ActiveMQConnection replyConnection = (ActiveMQConnection) replyFactory.createConnection();
replyConnection.setWatchTopicAdvisories(false);
replyConnection.start();
Session replySession = replyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer replyConsumer = (ActiveMQMessageConsumer) replySession.createConsumer(requestReplyDest);
javax.jms.Message msg = replyConsumer.receive(10000);
assertNotNull("request message not null: " + i, msg);
MessageProducer replyProducer = replySession.createProducer(msg.getJMSReplyTo());
replyProducer.send(session.createTextMessage("reply-" + i));
replyConnection.close();
javax.jms.Message reply = consumer.receive(10000);
assertNotNull("reply message : " + i + ", to: " + tempDest + ", by consumer:" + consumer.getConsumerId(), reply);
consumer.close();
tempDest.delete();
}
}
protected DestinationViewMBean createView(String broker, String destination, byte type) throws Exception {
String domain = "org.apache.activemq";
@ -113,8 +188,9 @@ public class TwoBrokerTempQueueAdvisoryTest extends JmsMultipleBrokersTestSuppor
String options = new String("?persistent=false");
createBroker(new URI("broker:(tcp://localhost:0)/BrokerA" + options));
createBroker(new URI("broker:(tcp://localhost:0)/BrokerB" + options));
}
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerA");
public static Test suite() {
return suite(TwoBrokerTempQueueAdvisoryTest.class);
}
}