diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java index 90a6848a01..c6ea1ca264 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; @@ -35,6 +36,7 @@ public class WireFormatNegotiator extends TransportFilter { private OpenWireFormat wireFormat; private final int minimumVersion; + private long negotiateTimeout=15000; private final AtomicBoolean firstStart=new AtomicBoolean(true); private final CountDownLatch readyCountDownLatch = new CountDownLatch(1); @@ -70,7 +72,8 @@ public class WireFormatNegotiator extends TransportFilter { public void oneway(Command command) throws IOException { try { - readyCountDownLatch.await(); + if( !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS) ) + throw new IOException("Wire format negociation timeout: peer did not send his wire format."); } catch (InterruptedException e) { throw new InterruptedIOException(); } @@ -130,4 +133,13 @@ public class WireFormatNegotiator extends TransportFilter { protected void onWireFormatNegotiated(WireFormatInfo info) { } + + + public long getNegotiateTimeout() { + return negotiateTimeout; + } + + public void setNegotiateTimeout(long negotiateTimeout) { + this.negotiateTimeout = negotiateTimeout; + } }