https://issues.apache.org/jira/browse/AMQ-3253 - rework to make recreation of temp destination more specific to use case, result is no advisories like https://issues.apache.org/jira/browse/AMQ-2571. This avoids the possibility of looping in a network. Additional test. Resolve race condition on destination advisories, a remove could overwrite an inflight add command. A copy is now in place for the remove

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1213208 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-12-12 12:19:43 +00:00
parent 9873e21aec
commit d4ccc50ea7
9 changed files with 195 additions and 25 deletions

View File

@ -183,6 +183,8 @@ public class AdvisoryBroker extends BrokerFilter {
super.removeDestination(context, destination, timeout);
DestinationInfo info = destinations.remove(destination);
if (info != null) {
// ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
info = info.copy();
info.setDestination(destination);
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
@ -204,6 +206,8 @@ public class AdvisoryBroker extends BrokerFilter {
super.removeDestinationInfo(context, destInfo);
DestinationInfo info = destinations.remove(destInfo.getDestination());
if (info != null) {
// ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
info = info.copy();
info.setDestination(destInfo.getDestination());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
@ -214,7 +218,6 @@ public class AdvisoryBroker extends BrokerFilter {
}
try {
next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
} catch (Exception expectedIfDestinationDidNotExistYet) {
}
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.transaction.Transaction;
/**
@ -58,6 +59,7 @@ public class ConnectionContext {
private final MessageEvaluationContext messageEvaluationContext;
private boolean dontSendReponse;
private boolean clientMaster = true;
private ConnectionState connectionState;
public ConnectionContext() {
this.messageEvaluationContext = new MessageEvaluationContext();
@ -320,4 +322,11 @@ public class ConnectionContext {
this.faultTolerant = faultTolerant;
}
public void setConnectionState(ConnectionState connectionState) {
this.connectionState = connectionState;
}
public ConnectionState getConnectionState() {
return this.connectionState;
}
}

View File

@ -668,6 +668,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
context.setWireFormatInfo(wireFormatInfo);
context.setReconnect(info.isFailoverReconnect());
this.manageable = info.isManageable();
context.setConnectionState(state);
state.setContext(context);
state.setConnection(this);
if (info.getClientIp() == null) {

View File

@ -125,7 +125,9 @@ public abstract class AbstractRegion implements Region {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
boolean createIfTemporary) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
}
destinationsLock.writeLock().lock();
try {
@ -142,18 +144,6 @@ public abstract class AbstractRegion implements Region {
destinations.put(destination, dest);
destinationMap.put(destination, dest);
addSubscriptionsForDestination(context, dest);
if (destination.isTemporary()) {
// need to associate with the connection so it can get removed
if (context.getConnection() instanceof TransportConnection) {
TransportConnection transportConnection = (TransportConnection) context.getConnection();
DestinationInfo info = new DestinationInfo(context.getConnectionId(),
DestinationInfo.ADD_OPERATION_TYPE,
destination);
transportConnection.processAddDestination(info);
LOG.debug("assigning ownership of auto created temp : " + destination + " to connection:"
+ context.getConnectionId());
}
}
}
if (dest == null) {
throw new JMSException("The destination " + destination + " does not exist.");
@ -207,7 +197,9 @@ public abstract class AbstractRegion implements Region {
// dropping the subscription.
}
LOG.debug("Removing destination: " + destination);
if (LOG.isDebugEnabled()) {
LOG.debug(broker.getBrokerName() + " removing destination: " + destination);
}
destinationsLock.writeLock().lock();
try {
@ -229,8 +221,10 @@ public abstract class AbstractRegion implements Region {
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Destination doesn't exist: " + dest);
}
}
} finally {
destinationsLock.writeLock().unlock();
}
@ -262,8 +256,10 @@ public abstract class AbstractRegion implements Region {
@SuppressWarnings("unchecked")
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
+ info.getDestination());
}
ActiveMQDestination destination = info.getDestination();
if (destination != null && !destination.isPattern() && !destination.isComposite()) {
// lets auto-create the destination
@ -362,8 +358,10 @@ public abstract class AbstractRegion implements Region {
@SuppressWarnings("unchecked")
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
+ info.getDestination());
}
Subscription sub = subscriptions.remove(info.getConsumerId());
// The sub could be removed elsewhere - see ConnectionSplitBroker
@ -418,7 +416,9 @@ public abstract class AbstractRegion implements Region {
LOG.warn("Ack for non existent subscription, ack:" + ack);
throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
}
return;
}
}

View File

@ -37,6 +37,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.EmptyBroker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -373,9 +374,22 @@ public class RegionBroker extends EmptyBroker {
if (destination != null) {
inactiveDestinationsPurgeLock.readLock().lock();
try {
if (!destinations.containsKey(destination)) {
// This seems to cause the destination to be added but without
// advisories firing...
context.getBroker().addDestination(context, destination, true);
// associate it with the connection so that it can get deleted
if (destination.isTemporary() && context.getConnectionState() != null) {
DestinationInfo destinationInfo = new DestinationInfo(context.getConnectionId(),
DestinationInfo.ADD_OPERATION_TYPE,
destination);
context.getConnectionState().addTempDestination(destinationInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("assigning ownership of auto created temp : " + destination + " to connection:"
+ context.getConnectionId());
}
}
}
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info);

View File

@ -126,4 +126,13 @@ public class DestinationInfo extends BaseCommand {
throw new IOException("Unknown operation type: " + getOperationType());
}
public DestinationInfo copy() {
DestinationInfo result = new DestinationInfo();
super.copy(result);
result.connectionId = connectionId;
result.destination = destination;
result.operationType = operationType;
result.brokerPath = brokerPath;
return result;
}
}

View File

@ -553,7 +553,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
if (LOG.isTraceEnabled()) {
LOG.trace("bridging destination control command: " + destInfo);
LOG.trace(configuration.getBrokerName() +" bridging destination control command: " + destInfo);
}
localBroker.oneway(destInfo);
} else if (data.getClass() == RemoveInfo.class) {

View File

@ -48,6 +48,8 @@ import org.apache.activemq.advisory.ConsumerListener;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -187,6 +189,18 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
return result;
}
protected void waitForMinTopicRegionConsumerCount(final String name, final int count) throws Exception {
final BrokerService broker = brokers.get(name).broker;
final TopicRegion topicRegion = (TopicRegion) ((RegionBroker) broker.getRegionBroker()).getTopicRegion();
assertTrue("found expected consumers in topic region of" + name, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("topic consumers: " + name +", " + topicRegion.getSubscriptions().toString());
return topicRegion.getSubscriptions().size() >= count;
}
}));
}
protected void waitForBridgeFormation() throws Exception {
waitForBridgeFormation(1);
}

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Session;
import javax.management.ObjectName;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TwoBrokerTempQueueAdvisoryTest extends JmsMultipleBrokersTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerTempQueueAdvisoryTest.class);
private void sendReceiveTempQueueMessage(String broker) throws Exception {
ConnectionFactory factory = getConnectionFactory(broker);
Connection conn = factory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createTemporaryQueue();
conn.close();
}
public void testTemporaryQueueAdvisory() throws Exception {
LOG.info("Running testTemporaryQueueAdvisory()");
startAllBrokers();
waitForBridgeFormation();
waitForMinTopicRegionConsumerCount("BrokerB", 1);
waitForMinTopicRegionConsumerCount("BrokerA", 1);
final int iterations = 30;
for (int i = 0; i < iterations; i++) {
sendReceiveTempQueueMessage("BrokerA");
}
waitForMinTopicRegionConsumerCount("BrokerB", 1);
waitForMinTopicRegionConsumerCount("BrokerA", 1);
final DestinationViewMBean brokerAView = createView("BrokerA", "ActiveMQ.Advisory.TempQueue", ActiveMQDestination.TOPIC_TYPE);
assertTrue("exact amount of advisories created on A, one each for creation/deletion", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("BrokerA temp advisory enque count: " + brokerAView.getEnqueueCount());
return iterations * 2 == brokerAView.getEnqueueCount();
}
}));
final DestinationViewMBean brokerBView = createView("BrokerB", "ActiveMQ.Advisory.TempQueue", ActiveMQDestination.TOPIC_TYPE);
assertTrue("exact amount of advisories created on B, one each for creation/deletion", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("BrokerB temp advisory enque count: " + brokerBView.getEnqueueCount());
return iterations * 2 == brokerBView.getEnqueueCount();
}
}));
}
protected DestinationViewMBean createView(String broker, String destination, byte type) throws Exception {
String domain = "org.apache.activemq";
ObjectName name;
if (type == ActiveMQDestination.QUEUE_TYPE) {
name = new ObjectName(domain + ":BrokerName=" + broker + ",Type=Queue,Destination=" + destination);
} else {
name = new ObjectName(domain + ":BrokerName=" + broker + ",Type=Topic,Destination=" + destination);
}
return (DestinationViewMBean) brokers.get(broker).broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
true);
}
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
String options = new String("?persistent=false");
createBroker(new URI("broker:(tcp://localhost:0)/BrokerA" + options));
createBroker(new URI("broker:(tcp://localhost:0)/BrokerB" + options));
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerA");
}
}