https://issues.apache.org/jira/browse/AMQ-4222 - remove region reference for all mutable producer exchanges (and some more refactorings)

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1423834 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-12-19 11:41:17 +00:00
parent 10e1c0c8d3
commit 7400977609
2 changed files with 44 additions and 196 deletions

View File

@ -125,16 +125,9 @@ public class RegionBroker extends EmptyBroker {
@Override
public Set <Destination> getDestinations(ActiveMQDestination destination) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.getDestinations(destination);
case ActiveMQDestination.TOPIC_TYPE:
return topicRegion.getDestinations(destination);
case ActiveMQDestination.TEMP_QUEUE_TYPE:
return tempQueueRegion.getDestinations(destination);
case ActiveMQDestination.TEMP_TOPIC_TYPE:
return tempTopicRegion.getDestinations(destination);
default:
try {
return getRegion(destination).getDestinations(destination);
} catch (JMSException jmse) {
return Collections.emptySet();
}
}
@ -278,22 +271,10 @@ public class RegionBroker extends EmptyBroker {
return answer;
}
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
answer = queueRegion.addDestination(context, destination,true);
break;
case ActiveMQDestination.TOPIC_TYPE:
answer = topicRegion.addDestination(context, destination,true);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
answer = tempQueueRegion.addDestination(context, destination, createIfTemp);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
answer = tempTopicRegion.addDestination(context, destination, createIfTemp);
break;
default:
throw createUnknownDestinationTypeException(destination);
}
boolean create = true;
if (destination.isTemporary())
create = createIfTemp;
answer = getRegion(destination).addDestination(context, destination, create);
destinations.put(destination, answer);
return answer;
@ -303,28 +284,10 @@ public class RegionBroker extends EmptyBroker {
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
if (destinations.containsKey(destination)) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeDestination(context, destination, timeout);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeDestination(context, destination, timeout);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeDestination(context, destination, timeout);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.removeDestination(context, destination, timeout);
break;
default:
throw createUnknownDestinationTypeException(destination);
}
getRegion(destination).removeDestination(context, destination, timeout);
destinations.remove(destination);
}
}
@Override
@ -359,20 +322,7 @@ public class RegionBroker extends EmptyBroker {
// This seems to cause the destination to be added but without
// advisories firing...
context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.addProducer(context, info);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.addProducer(context, info);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.addProducer(context, info);
break;
}
getRegion(destination).addProducer(context, info);
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
}
@ -385,20 +335,7 @@ public class RegionBroker extends EmptyBroker {
if (destination != null) {
inactiveDestinationsPurgeLock.readLock().lock();
try {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeProducer(context, info);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeProducer(context, info);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeProducer(context, info);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.removeProducer(context, info);
break;
}
getRegion(destination).removeProducer(context, info);
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
}
@ -413,22 +350,7 @@ public class RegionBroker extends EmptyBroker {
}
inactiveDestinationsPurgeLock.readLock().lock();
try {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.addConsumer(context, info);
case ActiveMQDestination.TOPIC_TYPE:
return topicRegion.addConsumer(context, info);
case ActiveMQDestination.TEMP_QUEUE_TYPE:
return tempQueueRegion.addConsumer(context, info);
case ActiveMQDestination.TEMP_TOPIC_TYPE:
return tempTopicRegion.addConsumer(context, info);
default:
throw createUnknownDestinationTypeException(destination);
}
return getRegion(destination).addConsumer(context, info);
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
}
@ -439,23 +361,7 @@ public class RegionBroker extends EmptyBroker {
ActiveMQDestination destination = info.getDestination();
inactiveDestinationsPurgeLock.readLock().lock();
try {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeConsumer(context, info);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeConsumer(context, info);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeConsumer(context, info);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.removeConsumer(context, info);
break;
default:
throw createUnknownDestinationTypeException(destination);
}
getRegion(destination).removeConsumer(context, info);
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
}
@ -479,24 +385,7 @@ public class RegionBroker extends EmptyBroker {
|| (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
// ensure the destination is registered with the RegionBroker
producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
Region region;
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
region = queueRegion;
break;
case ActiveMQDestination.TOPIC_TYPE:
region = topicRegion;
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
region = tempQueueRegion;
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
region = tempTopicRegion;
break;
default:
throw createUnknownDestinationTypeException(destination);
}
producerExchange.setRegion(region);
producerExchange.setRegion(getRegion(destination));
producerExchange.setRegionDestination(null);
}
@ -504,7 +393,7 @@ public class RegionBroker extends EmptyBroker {
// clean up so these references aren't kept (possible leak) in the producer exchange
// especially since temps are transitory
if (destination.isTemporary()) {
if (producerExchange.isMutable()) {
producerExchange.setRegionDestination(null);
producerExchange.setRegion(null);
}
@ -514,46 +403,30 @@ public class RegionBroker extends EmptyBroker {
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
ActiveMQDestination destination = ack.getDestination();
Region region;
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
region = queueRegion;
break;
case ActiveMQDestination.TOPIC_TYPE:
region = topicRegion;
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
region = tempQueueRegion;
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
region = tempTopicRegion;
break;
default:
throw createUnknownDestinationTypeException(destination);
}
consumerExchange.setRegion(region);
consumerExchange.setRegion(getRegion(destination));
}
consumerExchange.getRegion().acknowledge(consumerExchange, ack);
}
protected Region getRegion(ActiveMQDestination destination) throws JMSException {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion;
case ActiveMQDestination.TOPIC_TYPE:
return topicRegion;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
return tempQueueRegion;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
return tempTopicRegion;
default:
throw createUnknownDestinationTypeException(destination);
}
}
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
ActiveMQDestination destination = pull.getDestination();
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.messagePull(context, pull);
case ActiveMQDestination.TOPIC_TYPE:
return topicRegion.messagePull(context, pull);
case ActiveMQDestination.TEMP_QUEUE_TYPE:
return tempQueueRegion.messagePull(context, pull);
case ActiveMQDestination.TEMP_TOPIC_TYPE:
return tempTopicRegion.messagePull(context, pull);
default:
throw createUnknownDestinationTypeException(destination);
}
return getRegion(destination).messagePull(context, pull);
}
@Override
@ -684,22 +557,7 @@ public class RegionBroker extends EmptyBroker {
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
ActiveMQDestination destination = messageDispatchNotification.getDestination();
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.processDispatchNotification(messageDispatchNotification);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.processDispatchNotification(messageDispatchNotification);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.processDispatchNotification(messageDispatchNotification);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.processDispatchNotification(messageDispatchNotification);
break;
default:
throw createUnknownDestinationTypeException(destination);
}
getRegion(destination).processDispatchNotification(messageDispatchNotification);
}
@Override
@ -879,24 +737,9 @@ public class RegionBroker extends EmptyBroker {
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
ActiveMQDestination destination = control.getDestination();
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.processConsumerControl(consumerExchange, control);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.processConsumerControl(consumerExchange, control);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.processConsumerControl(consumerExchange, control);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.processConsumerControl(consumerExchange, control);
break;
default:
try {
getRegion(destination).processConsumerControl(consumerExchange, control);
} catch (JMSException jmse) {
LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control);
}
}

View File

@ -20,12 +20,12 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.*;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.util.Wait;
import javax.jms.*;
import javax.jms.Connection;
import javax.jms.*;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Map;
@ -135,9 +135,14 @@ public class AMQ4222Test extends TestSupport {
// still have a reference in his producerBrokerExchange.. this will keep the destination
// from being reclaimed by GC if there is never another send that producer makes...
// let's see if that reference is there...
TransportConnector connector = VMTransportFactory.CONNECTORS.get("localhost");
final TransportConnector connector = VMTransportFactory.CONNECTORS.get("localhost");
assertNotNull(connector);
assertEquals(1, connector.getConnections().size());
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connector.getConnections().size() == 1;
}
}));
TransportConnection transportConnection = connector.getConnections().get(0);
Map<ProducerId, ProducerBrokerExchange> exchanges = getProducerExchangeFromConn(transportConnection);
assertEquals(1, exchanges.size());