git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@439939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-09-04 05:44:38 +00:00
parent ac66a09dab
commit b75a6dac1b
15 changed files with 25 additions and 16 deletions

View File

@ -1409,6 +1409,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
try { try {
brokerInfoReceived.await(); brokerInfoReceived.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }

View File

@ -143,6 +143,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
try { try {
md = unconsumedMessages.dequeue(-1); md = unconsumedMessages.dequeue(-1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }

View File

@ -400,6 +400,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }

View File

@ -1337,6 +1337,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
try { try {
executor.execute(messageDispatch); executor.execute(messageDispatch);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
connection.onAsyncException(e); connection.onAsyncException(e);
} }
} }

View File

@ -118,6 +118,7 @@ public class ActiveMQSessionExecutor implements Task {
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
} }
} }

View File

@ -86,7 +86,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected int prefetchSize = 1000; protected int prefetchSize = 1000;
protected boolean dispatchAsync; protected boolean dispatchAsync;
protected String destinationFilter = ">"; protected String destinationFilter = ">";
protected boolean bridgeTempDestinations = false; protected boolean bridgeTempDestinations = true;
protected String name = "bridge"; protected String name = "bridge";
protected ConsumerInfo demandConsumerInfo; protected ConsumerInfo demandConsumerInfo;
protected int demandConsumerDispatched; protected int demandConsumerDispatched;
@ -271,20 +271,13 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
// Listen to consumer advisory messages on the remote broker to determine demand. // Listen to consumer advisory messages on the remote broker to determine demand.
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1); demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
demandConsumerInfo.setDispatchAsync(dispatchAsync); demandConsumerInfo.setDispatchAsync(dispatchAsync);
demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+destinationFilter;
+destinationFilter)); if( bridgeTempDestinations ) {
advisoryTopic += ","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
}
demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
demandConsumerInfo.setPrefetchSize(prefetchSize); demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo); remoteBroker.oneway(demandConsumerInfo);
if( bridgeTempDestinations ) {
//we want information about Destinations as well
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
destinationInfo.setPrefetchSize(prefetchSize);
destinationInfo.setDispatchAsync(dispatchAsync);
remoteBroker.oneway(destinationInfo);
}
startedLatch.countDown(); startedLatch.countDown();
if (!disposed){ if (!disposed){

View File

@ -55,7 +55,7 @@ public abstract class NetworkConnector extends ServiceSupport {
private boolean dispatchAsync = true; private boolean dispatchAsync = true;
private String userName; private String userName;
private String password; private String password;
private boolean bridgeTempDestinations=false; private boolean bridgeTempDestinations=true;
protected ConnectionFilter connectionFilter; protected ConnectionFilter connectionFilter;

View File

@ -313,6 +313,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
} }
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Request to start checkpoint failed: " + e, e); log.warn("Request to start checkpoint failed: " + e, e);
} }
} }

View File

@ -312,6 +312,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
} }
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Request to start checkpoint failed: " + e, e); log.warn("Request to start checkpoint failed: " + e, e);
} }
} }

View File

@ -98,6 +98,7 @@ class DedicatedTaskRunner implements TaskRunner {
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Someone really wants this thread to die off. // Someone really wants this thread to die off.
Thread.currentThread().interrupt();
} finally { } finally {
// Make sure we notify any waiting threads that thread // Make sure we notify any waiting threads that thread
// has terminated. // has terminated.

View File

@ -81,6 +81,7 @@ public class WireFormatNegotiator extends TransportFilter {
if( !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS) ) if( !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS) )
throw new IOException("Wire format negociation timeout: peer did not send his wire format."); throw new IOException("Wire format negociation timeout: peer did not send his wire format.");
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
super.oneway(command); super.oneway(command);

View File

@ -138,6 +138,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
sleepMutex.wait(event.reconnectDelay); sleepMutex.wait(event.reconnectDelay);
}catch(InterruptedException ie){ }catch(InterruptedException ie){
Thread.currentThread().interrupt();
return; return;
} }
} }

View File

@ -115,6 +115,7 @@ public class FailoverTransport implements CompositeTransport {
handleTransportFailure(error); handleTransportFailure(error);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt();
transportListener.onException(new InterruptedIOException()); transportListener.onException(new InterruptedIOException());
} }
} }
@ -349,6 +350,7 @@ public class FailoverTransport implements CompositeTransport {
reconnectMutex.wait(1000); reconnectMutex.wait(1000);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.debug("Interupted: " + e, e); log.debug("Interupted: " + e, e);
} }
} }
@ -399,6 +401,7 @@ public class FailoverTransport implements CompositeTransport {
} }
catch (InterruptedException e) { catch (InterruptedException e) {
// Some one may be trying to stop our thread. // Some one may be trying to stop our thread.
Thread.currentThread().interrupt();
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
if(!disposed){ if(!disposed){

View File

@ -142,6 +142,7 @@ public class FanoutTransport implements CompositeTransport {
} }
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt();
transportListener.onException(new InterruptedIOException()); transportListener.onException(new InterruptedIOException());
} }
} }
@ -394,6 +395,7 @@ public class FanoutTransport implements CompositeTransport {
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Some one may be trying to stop our thread. // Some one may be trying to stop our thread.
Thread.currentThread().interrupt();
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
} }

View File

@ -407,6 +407,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
try { try {
Thread.sleep(BIND_ATTEMPT_DELAY); Thread.sleep(BIND_ATTEMPT_DELAY);
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
Thread.currentThread().interrupt();
throw e; throw e;
} }
} }