diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index bec56d9115..5455dded20 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -1409,6 +1409,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon try { brokerInfoReceived.await(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw JMSExceptionSupport.create(e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java index 12cde681aa..c1576cec48 100644 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java @@ -143,6 +143,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch try { md = unconsumedMessages.dequeue(-1); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw JMSExceptionSupport.create(e); } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index d6fa90c9fc..8d2c86989f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -400,6 +400,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw JMSExceptionSupport.create(e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 58105e41dd..378a53a445 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1337,6 +1337,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta try { executor.execute(messageDispatch); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); connection.onAsyncException(e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java index 02aac882b6..b34840caa4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java @@ -118,6 +118,7 @@ public class ActiveMQSessionExecutor implements Task { } } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw JMSExceptionSupport.create(e); } } @@ -160,4 +161,4 @@ public class ActiveMQSessionExecutor implements Task { return messageQueue.removeAll(); } -} \ No newline at end of file +} diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 15891cdc10..602ac92771 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -86,7 +86,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { protected int prefetchSize = 1000; protected boolean dispatchAsync; protected String destinationFilter = ">"; - protected boolean bridgeTempDestinations = false; + protected boolean bridgeTempDestinations = true; protected String name = "bridge"; protected ConsumerInfo demandConsumerInfo; protected int demandConsumerDispatched; @@ -271,20 +271,13 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { // Listen to consumer advisory messages on the remote broker to determine demand. demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1); demandConsumerInfo.setDispatchAsync(dispatchAsync); - demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX - +destinationFilter)); - demandConsumerInfo.setPrefetchSize(prefetchSize); - remoteBroker.oneway(demandConsumerInfo); - + String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+destinationFilter; if( bridgeTempDestinations ) { - //we want information about Destinations as well - ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2); - destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); - destinationInfo.setPrefetchSize(prefetchSize); - destinationInfo.setDispatchAsync(dispatchAsync); - remoteBroker.oneway(destinationInfo); + advisoryTopic += ","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; } - + demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); + demandConsumerInfo.setPrefetchSize(prefetchSize); + remoteBroker.oneway(demandConsumerInfo); startedLatch.countDown(); if (!disposed){ diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java index c58109fdae..4138e4c621 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -55,7 +55,7 @@ public abstract class NetworkConnector extends ServiceSupport { private boolean dispatchAsync = true; private String userName; private String password; - private boolean bridgeTempDestinations=false; + private boolean bridgeTempDestinations=true; protected ConnectionFilter connectionFilter; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index cf3c92a3f2..f742f20201 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -313,6 +313,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.warn("Request to start checkpoint failed: " + e, e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java index c764153b6c..76aa59acb0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java @@ -312,6 +312,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.warn("Request to start checkpoint failed: " + e, e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java b/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java index 64819fa089..0321e06471 100644 --- a/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java @@ -98,6 +98,7 @@ class DedicatedTaskRunner implements TaskRunner { } catch (InterruptedException e) { // Someone really wants this thread to die off. + Thread.currentThread().interrupt(); } finally { // Make sure we notify any waiting threads that thread // has terminated. @@ -107,4 +108,4 @@ class DedicatedTaskRunner implements TaskRunner { } } } -} \ No newline at end of file +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java index 55d05b3a89..92a813f6c9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java @@ -81,6 +81,7 @@ public class WireFormatNegotiator extends TransportFilter { if( !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS) ) throw new IOException("Wire format negociation timeout: peer did not send his wire format."); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new InterruptedIOException(); } super.oneway(command); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java index 3fcf7b0284..b56430049d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java @@ -138,6 +138,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { sleepMutex.wait(event.reconnectDelay); }catch(InterruptedException ie){ + Thread.currentThread().interrupt(); return; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 737f143d1f..58cae92a02 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -115,6 +115,7 @@ public class FailoverTransport implements CompositeTransport { handleTransportFailure(error); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); transportListener.onException(new InterruptedIOException()); } } @@ -349,6 +350,7 @@ public class FailoverTransport implements CompositeTransport { reconnectMutex.wait(1000); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.debug("Interupted: " + e, e); } } @@ -399,6 +401,7 @@ public class FailoverTransport implements CompositeTransport { } catch (InterruptedException e) { // Some one may be trying to stop our thread. + Thread.currentThread().interrupt(); throw new InterruptedIOException(); } if(!disposed){ diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index 82368a623c..212138b7aa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -142,6 +142,7 @@ public class FanoutTransport implements CompositeTransport { } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); transportListener.onException(new InterruptedIOException()); } } @@ -394,6 +395,7 @@ public class FanoutTransport implements CompositeTransport { } } catch (InterruptedException e) { // Some one may be trying to stop our thread. + Thread.currentThread().interrupt(); throw new InterruptedIOException(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java index b90414d379..130cbe4c10 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java @@ -407,6 +407,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S try { Thread.sleep(BIND_ATTEMPT_DELAY); } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); throw e; } }