From 95ff01053a1e91a96cd1bffdb81442be89ef3734 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 16 Mar 2009 17:13:20 +0000 Subject: [PATCH] Allow the negociation protocol to be used when not inserted as a transport filter. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@754959 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/WireFormatNegotiator.java | 94 ++++++++++--------- 1 file changed, 51 insertions(+), 43 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java index 50c7660477..9b8738f3b0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java @@ -71,15 +71,19 @@ public class WireFormatNegotiator extends TransportFilter { public void start() throws Exception { super.start(); if (firstStart.compareAndSet(true, false)) { - try { - WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); - if (LOG.isDebugEnabled()) { - LOG.debug("Sending: " + info); - } - sendWireFormat(info); - } finally { - wireInfoSentDownLatch.countDown(); + sendWireFormat(); + } + } + + public void sendWireFormat() throws IOException { + try { + WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending: " + info); } + sendWireFormat(info); + } finally { + wireInfoSentDownLatch.countDown(); } } @@ -104,45 +108,49 @@ public class WireFormatNegotiator extends TransportFilter { Command command = (Command)o; if (command.isWireFormatInfo()) { WireFormatInfo info = (WireFormatInfo)command; - if (LOG.isDebugEnabled()) { - LOG.debug("Received WireFormat: " + info); - } - - try { - wireInfoSentDownLatch.await(); - - if (LOG.isDebugEnabled()) { - LOG.debug(this + " before negotiation: " + wireFormat); - } - if (!info.isValid()) { - onException(new IOException("Remote wire format magic is invalid")); - } else if (info.getVersion() < minimumVersion) { - onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")")); - } - - wireFormat.renegotiateWireFormat(info); - Socket socket = next.narrow(Socket.class); - if (socket != null) { - socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled()); - } - - if (LOG.isDebugEnabled()) { - LOG.debug(this + " after negotiation: " + wireFormat); - } - - } catch (IOException e) { - onException(e); - } catch (InterruptedException e) { - onException((IOException)new InterruptedIOException().initCause(e)); - } catch (Exception e) { - onException(IOExceptionSupport.create(e)); - } - readyCountDownLatch.countDown(); - onWireFormatNegotiated(info); + negociate(info); } getTransportListener().onCommand(command); } + public void negociate(WireFormatInfo info) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received WireFormat: " + info); + } + + try { + wireInfoSentDownLatch.await(); + + if (LOG.isDebugEnabled()) { + LOG.debug(this + " before negotiation: " + wireFormat); + } + if (!info.isValid()) { + onException(new IOException("Remote wire format magic is invalid")); + } else if (info.getVersion() < minimumVersion) { + onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")")); + } + + wireFormat.renegotiateWireFormat(info); + Socket socket = next.narrow(Socket.class); + if (socket != null) { + socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug(this + " after negotiation: " + wireFormat); + } + + } catch (IOException e) { + onException(e); + } catch (InterruptedException e) { + onException((IOException)new InterruptedIOException().initCause(e)); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + readyCountDownLatch.countDown(); + onWireFormatNegotiated(info); + } + public void onException(IOException error) { readyCountDownLatch.countDown(); /*