diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 8c4e74f11c..84df7a4d5b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -116,6 +116,7 @@ public class ActiveMQConnection extends DefaultTransportListener implements Conn protected boolean asyncDispatch = true; private boolean useAsyncSend = false; private boolean useRetroactiveConsumer; + private int closeTimeout = 15000; private long flowControlSleepTime = 0; private final JMSConnectionStatsImpl stats; @@ -541,7 +542,7 @@ public class ActiveMQConnection extends DefaultTransportListener implements Conn if (isConnectionInfoSentToBroker) { - syncSendPacket(info.createRemoveCommand()); + syncSendPacket(info.createRemoveCommand(),closeTimeout); } asyncSendPacket(new ShutdownInfo()); @@ -734,6 +735,22 @@ public class ActiveMQConnection extends DefaultTransportListener implements Conn this.optimizedMessageDispatch = dispatchOptimizedMessage; } + /** + * @return Returns the closeTimeout. + */ + public int getCloseTimeout(){ + return closeTimeout; + } + + + /** + * @param closeTimeout The closeTimeout to set. + */ + public void setCloseTimeout(int closeTimeout){ + this.closeTimeout=closeTimeout; + } + + /** * * @return Returns the onSendPrepareMessageBody. @@ -1077,6 +1094,41 @@ public class ActiveMQConnection extends DefaultTransportListener implements Conn } } } + + /** + * Send a packet through a Connection - for internal use only + * + * @param command + * @return + * @throws JMSException + */ + public Response syncSendPacket(Command command, int timeout) throws JMSException { + if (isClosed()) { + throw new ConnectionClosedException(); + } else { + + if (command.isMessage() && flowControlSleepTime > 0) { + try { + Thread.sleep(flowControlSleepTime); + } catch (InterruptedException e) { + } + } + + try { + Response response = this.transport.request(command,timeout); + if (response.isException()) { + ExceptionResponse er = (ExceptionResponse) response; + if (er.getException() instanceof JMSException) + throw (JMSException) er.getException(); + else + throw JMSExceptionSupport.create(er.getException()); + } + return response; + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + } + } public boolean isClosed() { return closed.get(); diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index ca07725855..e2398d12d5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -81,6 +81,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private boolean objectMessageSerializationDefered = false; protected boolean asyncDispatch = true; private boolean useAsyncSend = false; + private int closeTimeout = 15000; private boolean useRetroactiveConsumer; JMSStatsImpl factoryStats = new JMSStatsImpl(); @@ -415,6 +416,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne props.setProperty("useCompression", Boolean.toString(isUseCompression())); props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); props.setProperty("userName", getUserName()); + props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); } public boolean isOnSendPrepareMessageBody() { @@ -448,4 +450,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public void setAsyncDispatch(boolean asyncDispatch) { this.asyncDispatch = asyncDispatch; } + + /** + * @return Returns the closeTimeout. + */ + public int getCloseTimeout(){ + return closeTimeout; + } + + /** + * @param closeTimeout The closeTimeout to set. + */ + public void setCloseTimeout(int closeTimeout){ + this.closeTimeout=closeTimeout; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java b/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java index 7fe31e660e..dcd6c6fa20 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java @@ -19,6 +19,8 @@ package org.apache.activemq.transport; import edu.emory.mathcs.backport.java.util.concurrent.Callable; import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException; import edu.emory.mathcs.backport.java.util.concurrent.FutureTask; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; +import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException; import org.apache.activemq.command.Response; import org.apache.activemq.util.IOExceptionSupport; @@ -52,6 +54,23 @@ public class FutureResponse extends FutureTask { } } + public synchronized Response getResult(int timeout) throws IOException { + try { + return (Response) super.get(timeout,TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted."); + } catch (ExecutionException e) { + Throwable target = e.getCause(); + if( target instanceof IOException ) { + throw (IOException)target; + } else { + throw IOExceptionSupport.create(target); + } + }catch(TimeoutException e){ + return null; + } + } + public synchronized void set(Object result) { super.set(result); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java index 4d5986e994..dbfd2e92b6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java @@ -51,6 +51,12 @@ public class MutexTransport extends TransportFilter { } } + public Response request(Command command,int timeout) throws IOException { + synchronized(writeMutex){ + return next.request(command,timeout); + } + } + public String toString() { return next.toString(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java index 9bed7cbb5f..539aa511e0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java @@ -67,6 +67,11 @@ final public class ResponseCorrelator extends TransportFilter { return response.getResult(); } + public Response request(Command command,int timeout) throws IOException { + FutureResponse response = asyncRequest(command); + return response.getResult(timeout); + } + public void onCommand(Command command) { boolean debug = log.isDebugEnabled(); if( command.isResponse() ) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java index 29097eaf95..dc5b6b7725 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java @@ -32,30 +32,53 @@ public interface Transport extends Service { /** * A one way asynchronous send + * @param command + * @throws IOException */ public void oneway(Command command) throws IOException; /** * An asynchronous request response where the Receipt will be returned * in the future + * @param command + * @return the FutureResponse + * @throws IOException */ public FutureResponse asyncRequest(Command command) throws IOException; - + /** * A synchronous request response + * @param command + * @return the response + * @throws IOException */ public Response request(Command command) throws IOException; + /** + * A synchronous request response + * @param command + * @param timeout + * @return the repsonse or null if timeout + * @throws IOException + */ + public Response request(Command command, int timeout) throws IOException; + /** * Returns the current transport listener + * @return */ public TransportListener getTransportListener(); /** * Registers an inbound command listener + * @param commandListener */ public void setTransportListener(TransportListener commandListener); + /** + * @param target + * @return the target + */ public Object narrow(Class target); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java index 5778e3d355..585ac54358 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java @@ -93,6 +93,10 @@ public class TransportFilter extends DefaultTransportListener implements Transpo public Response request(Command command) throws IOException { return next.request(command); } + + public Response request(Command command,int timeout) throws IOException { + return next.request(command,timeout); + } public void onException(IOException error) { transportListener.onException(error); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java index d14b89bb67..afde390510 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java @@ -73,6 +73,10 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo public Response request(Command command) throws IOException { throw new AssertionError("Unsupported Method"); } + + public Response request(Command command,int timeout) throws IOException { + throw new AssertionError("Unsupported Method"); + } /** * Process the inbound command diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 9547c5892e..2e27e2f733 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -389,6 +389,10 @@ public class FailoverTransport implements CompositeTransport { public Response request(Command command) throws IOException { throw new AssertionError("Unsupported Method"); } + + public Response request(Command command,int timeout) throws IOException { + throw new AssertionError("Unsupported Method"); + } public void add(URI u[]) { for (int i = 0; i < u.length; i++) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index 34bd6bd357..32bd02bf5d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -418,6 +418,10 @@ public class FanoutTransport implements CompositeTransport { public Response request(Command command) throws IOException { throw new AssertionError("Unsupported Method"); } + + public Response request(Command command,int timeout) throws IOException { + throw new AssertionError("Unsupported Method"); + } public void reconnect() { log.debug("Waking up reconnect task"); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java index 116fd3a1ed..982f6c71c3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java @@ -102,6 +102,10 @@ public class MockTransport extends DefaultTransportListener implements Transport synchronized public Response request(Command command) throws IOException { return next.request(command); } + + public Response request(Command command,int timeout) throws IOException { + return next.request(command, timeout); + } synchronized public void onException(IOException error) { transportListener.onException(error); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 5fabd01b6b..a20bf85664 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -86,6 +86,10 @@ public class VMTransport implements Transport{ public Response request(Command command) throws IOException{ throw new AssertionError("Unsupported Method"); } + + public Response request(Command command,int timeout) throws IOException { + throw new AssertionError("Unsupported Method"); + } public synchronized TransportListener getTransportListener() { return transportListener;