ported fix to trunk :

http://issues.apache.org/activemq/browse/AMQ-1176

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@514754 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonas B. Lim 2007-03-05 18:04:44 +00:00
parent b743552137
commit bd7d59c38f
9 changed files with 242 additions and 108 deletions

View File

@ -134,6 +134,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean useRetroactiveConsumer; private boolean useRetroactiveConsumer;
private boolean alwaysSyncSend; private boolean alwaysSyncSend;
private int closeTimeout = 15000; private int closeTimeout = 15000;
private boolean useSyncSend=false;
private boolean watchTopicAdvisories=true;
private final Transport transport; private final Transport transport;
private final IdGenerator clientIdGenerator; private final IdGenerator clientIdGenerator;
@ -1283,8 +1285,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// broker without having to do an RPC to the broker. // broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId()); ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId());
if( watchTopicAdvisories ) {
advisoryConsumer = new AdvisoryConsumer(this, consumerId); advisoryConsumer = new AdvisoryConsumer(this, consumerId);
} }
}
/** /**
@ -1294,6 +1298,21 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return useAsyncSend; return useAsyncSend;
} }
public void setUseSyncSend(boolean forceSyncSend) {
this.useSyncSend = forceSyncSend;
}
public synchronized boolean isWatchTopicAdvisories() {
return watchTopicAdvisories;
}
public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
this.watchTopicAdvisories = watchTopicAdvisories;
}
/** /**
* Forces the use of <a * Forces the use of <a
* href="http://activemq.apache.org/async-sends.html">Async Sends</a> * href="http://activemq.apache.org/async-sends.html">Async Sends</a>
@ -1648,6 +1667,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
checkClosedOrFailed(); checkClosedOrFailed();
for(Iterator i=this.sessions.iterator();i.hasNext();){
ActiveMQSession s=(ActiveMQSession) i.next();
if( s.isInUse(destination) ) {
throw new JMSException("A consumer is consuming from the temporary destination");
}
}
activeTempDestinations.remove(destination); activeTempDestinations.remove(destination);
DestinationInfo info = new DestinationInfo(); DestinationInfo info = new DestinationInfo();
@ -1661,6 +1688,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public boolean isDeleted(ActiveMQDestination dest) { public boolean isDeleted(ActiveMQDestination dest) {
// If we are not watching the advisories.. then
// we will assume that the temp destination does exist.
if( advisoryConsumer==null )
return false;
return !activeTempDestinations.contains(dest); return !activeTempDestinations.contains(dest);
} }

View File

@ -88,6 +88,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean nestedMapAndListEnabled = true; private boolean nestedMapAndListEnabled = true;
JMSStatsImpl factoryStats = new JMSStatsImpl(); JMSStatsImpl factoryStats = new JMSStatsImpl();
private boolean alwaysSyncSend; private boolean alwaysSyncSend;
private boolean useSyncSend=false;
private boolean watchTopicAdvisories=true;
static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
public Thread newThread(Runnable run) { public Thread newThread(Runnable run) {
@ -260,6 +262,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setRedeliveryPolicy(getRedeliveryPolicy()); connection.setRedeliveryPolicy(getRedeliveryPolicy());
connection.setTransformer(getTransformer()); connection.setTransformer(getTransformer());
connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
connection.setWatchTopicAdvisories(watchTopicAdvisories);
transport.start(); transport.start();
@ -431,6 +434,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
this.useAsyncSend = useAsyncSend; this.useAsyncSend = useAsyncSend;
} }
public void setUseSyncSend(boolean forceSyncSend) {
this.useSyncSend = forceSyncSend;
}
public synchronized boolean isWatchTopicAdvisories() {
return watchTopicAdvisories;
}
public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
this.watchTopicAdvisories = watchTopicAdvisories;
}
/** /**
* @return true if always sync send messages * @return true if always sync send messages
*/ */
@ -564,6 +579,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
props.setProperty("useCompression", Boolean.toString(isUseCompression())); props.setProperty("useCompression", Boolean.toString(isUseCompression()));
props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
if (getUserName() != null) { if (getUserName() != null) {
props.setProperty("userName", getUserName()); props.setProperty("userName", getUserName());

View File

@ -941,4 +941,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return false; return false;
} }
public boolean isInUse(ActiveMQTempDestination destination) {
return info.getDestination().equals(destination);
}
} }

View File

@ -1831,6 +1831,14 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} }
} }
public boolean isInUse(ActiveMQTempDestination destination) {
for(Iterator iter=consumers.iterator();iter.hasNext();){
ActiveMQMessageConsumer c=(ActiveMQMessageConsumer) iter.next();
if( c.isInUse(destination) ) {
return true;
}
}
return false;
}
} }

View File

@ -65,7 +65,7 @@ import org.apache.activemq.util.ServiceStopper;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
/** /**
* Routes Broker operations to the correct messaging regions for processing. * Routes Broker operations to the correct messaging regions for processing.
@ -87,7 +87,7 @@ public class RegionBroker implements Broker {
protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
private final CopyOnWriteArraySet destinations = new CopyOnWriteArraySet(); private final HashMap destinations = new HashMap();
private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList();
private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
@ -246,10 +246,14 @@ public class RegionBroker implements Broker {
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
if( destinations.contains(destination) ){
throw new DestinationAlreadyExistsException(destination); Destination answer;
synchronized(destinations) {
answer = (Destination) destinations.get(destination);
if( answer!=null )
return answer;
} }
Destination answer = null;
switch(destination.getDestinationType()) { switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
answer = queueRegion.addDestination(context, destination); answer = queueRegion.addDestination(context, destination);
@ -267,13 +271,15 @@ public class RegionBroker implements Broker {
throw createUnknownDestinationTypeException(destination); throw createUnknownDestinationTypeException(destination);
} }
destinations.add(destination); synchronized(destinations) {
destinations.put(destination, answer);
return answer; return answer;
} }
}
public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Exception{
throws Exception{ synchronized(destinations) {
if(destinations.contains(destination)){ if( destinations.remove(destination)!=null ){
switch(destination.getDestinationType()){ switch(destination.getDestinationType()){
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeDestination(context,destination,timeout); queueRegion.removeDestination(context,destination,timeout);
@ -290,7 +296,7 @@ public class RegionBroker implements Broker {
default: default:
throw createUnknownDestinationTypeException(destination); throw createUnknownDestinationTypeException(destination);
} }
destinations.remove(destination); }
} }
} }
@ -305,7 +311,10 @@ public class RegionBroker implements Broker {
} }
public ActiveMQDestination[] getDestinations() throws Exception { public ActiveMQDestination[] getDestinations() throws Exception {
ArrayList l = new ArrayList(destinations); ArrayList l;
synchronized(destinations) {
l = new ArrayList(destinations.values());
}
ActiveMQDestination rc[] = new ActiveMQDestination[l.size()]; ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
l.toArray(rc); l.toArray(rc);
return rc; return rc;

View File

@ -18,7 +18,11 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
@ -31,7 +35,25 @@ public class TempQueueRegion extends AbstractRegion {
public TempQueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { public TempQueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
setAutoCreateDestinations(false); // We should allow the following to be configurable via a Destination Policy
// setAutoCreateDestinations(false);
}
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
if( !context.isNetworkConnection() && !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
}
super.addSubscription(context, sub);
};
};
} }
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
@ -46,4 +68,15 @@ public class TempQueueRegion extends AbstractRegion {
return "TempQueueRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%"; return "TempQueueRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%";
} }
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
// Force a timeout value so that we don't get an error that
// there is still an active sub. Temp destination may be removed
// while a network sub is still active which is valid.
if( timeout == 0 )
timeout = 1;
super.removeDestination(context, destination, timeout);
}
} }

View File

@ -16,6 +16,7 @@ package org.apache.activemq.broker.region;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -36,7 +37,8 @@ public class TempTopicRegion extends AbstractRegion{
public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics,UsageManager memoryManager, public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics,UsageManager memoryManager,
TaskRunnerFactory taskRunnerFactory,DestinationFactory destinationFactory){ TaskRunnerFactory taskRunnerFactory,DestinationFactory destinationFactory){
super(broker,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory); super(broker,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory);
setAutoCreateDestinations(false); // We should allow the following to be configurable via a Destination Policy
// setAutoCreateDestinations(false);
} }
protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException{ protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException{
@ -67,4 +69,15 @@ public class TempTopicRegion extends AbstractRegion{
return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory=" return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="
+memoryManager.getPercentUsage()+"%"; +memoryManager.getPercentUsage()+"%";
} }
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
// Force a timeout value so that we don't get an error that
// there is still an active sub. Temp destination may be removed
// while a network sub is still active which is valid.
if( timeout == 0 )
timeout = 1;
super.removeDestination(context, destination, timeout);
}
} }

View File

@ -358,6 +358,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
lastConnectSucceeded.set(true); lastConnectSucceeded.set(true);
serviceRemoteBrokerInfo(command); serviceRemoteBrokerInfo(command);
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);
}else if(command.getClass() == ConnectionError.class ) { }else if(command.getClass() == ConnectionError.class ) {
ConnectionError ce = (ConnectionError) command; ConnectionError ce = (ConnectionError) command;

View File

@ -535,81 +535,97 @@ public class BrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection1); assertNoMessagesLeft(connection1);
} }
public void initCombosForTestTempDestinationsRemovedOnConnectionClose() {
addCombinationValues( "deliveryMode", new Object[]{
new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)} );
addCombinationValues( "destinationType", new Object[]{
new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
}
public void testTempDestinationsRemovedOnConnectionClose() throws Exception { //
// TODO: need to reimplement this since we don't fail when we send to a non-existant
// destination. But if we can access the Region directly then we should be able to
// check that if the destination was removed.
//
// public void initCombosForTestTempDestinationsRemovedOnConnectionClose() {
// addCombinationValues( "deliveryMode", new Object[]{
// new Integer(DeliveryMode.NON_PERSISTENT),
// new Integer(DeliveryMode.PERSISTENT)} );
// addCombinationValues( "destinationType", new Object[]{
// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
// }
//
// public void testTempDestinationsRemovedOnConnectionClose() throws Exception {
//
// // Setup a first connection
// StubConnection connection1 = createConnection();
// ConnectionInfo connectionInfo1 = createConnectionInfo();
// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
// connection1.send(connectionInfo1);
// connection1.send(sessionInfo1);
// connection1.send(producerInfo1);
//
// destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
//
// StubConnection connection2 = createConnection();
// ConnectionInfo connectionInfo2 = createConnectionInfo();
// SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
// ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
// connection2.send(connectionInfo2);
// connection2.send(sessionInfo2);
// connection2.send(producerInfo2);
//
// // Send from connection2 to connection1's temp destination. Should succeed.
// connection2.send(createMessage(producerInfo2, destination, deliveryMode));
//
// // Close connection 1
// connection1.request(closeConnectionInfo(connectionInfo1));
//
// try {
// // Send from connection2 to connection1's temp destination. Should not succeed.
// connection2.request(createMessage(producerInfo2, destination, deliveryMode));
// fail("Expected JMSException.");
// } catch ( JMSException success ) {
// }
//
// }
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo1);
destination = createDestinationInfo(connection1, connectionInfo1, destinationType); // public void initCombosForTestTempDestinationsAreNotAutoCreated() {
// addCombinationValues( "deliveryMode", new Object[]{
// new Integer(DeliveryMode.NON_PERSISTENT),
// new Integer(DeliveryMode.PERSISTENT)} );
// addCombinationValues( "destinationType", new Object[]{
// new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
// new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
// }
//
//
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.send(producerInfo2);
// Send from connection2 to connection1's temp destination. Should succeed. // We create temp destination on demand now so this test case is no longer
connection2.send(createMessage(producerInfo2, destination, deliveryMode)); // valid.
//
// public void testTempDestinationsAreNotAutoCreated() throws Exception {
//
// // Setup a first connection
// StubConnection connection1 = createConnection();
// ConnectionInfo connectionInfo1 = createConnectionInfo();
// SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
// ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
// connection1.send(connectionInfo1);
// connection1.send(sessionInfo1);
// connection1.send(producerInfo1);
//
// destination = ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1", destinationType);
//
// // Should not be able to send to a non-existant temp destination.
// try {
// connection1.request(createMessage(producerInfo1, destination, deliveryMode));
// fail("Expected JMSException.");
// } catch ( JMSException success ) {
// }
//
// }
// Close connection 1
connection1.request(closeConnectionInfo(connectionInfo1));
try {
// Send from connection2 to connection1's temp destination. Should not succeed.
connection2.request(createMessage(producerInfo2, destination, deliveryMode));
fail("Expected JMSException.");
} catch ( JMSException success ) {
}
}
public void initCombosForTestTempDestinationsAreNotAutoCreated() {
addCombinationValues( "deliveryMode", new Object[]{
new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)} );
addCombinationValues( "destinationType", new Object[]{
new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)} );
}
public void testTempDestinationsAreNotAutoCreated() throws Exception {
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo1);
destination = ActiveMQDestination.createDestination(connectionInfo1.getConnectionId()+":1", destinationType);
// Should not be able to send to a non-existant temp destination.
try {
connection1.request(createMessage(producerInfo1, destination, deliveryMode));
fail("Expected JMSException.");
} catch ( JMSException success ) {
}
}
public void initCombosForTestTempDestinationsOnlyAllowsLocalConsumers() { public void initCombosForTestTempDestinationsOnlyAllowsLocalConsumers() {
addCombinationValues( "deliveryMode", new Object[]{ addCombinationValues( "deliveryMode", new Object[]{