From 0e8eadb8fb96e8382736a25e201bb9332a37ee89 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Fri, 11 Sep 2009 08:47:11 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-2386 - stomp+nio using selectors git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@813722 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/stomp/StompNIOTransport.java | 51 ++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java index 2f1229d636..4ed7086427 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java @@ -22,14 +22,21 @@ import java.io.IOException; import java.net.Socket; import java.net.URI; import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import javax.net.SocketFactory; +import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.nio.NIOBufferedInputStream; import org.apache.activemq.transport.nio.NIOOutputStream; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.transport.nio.SelectorSelection; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; /** @@ -40,6 +47,7 @@ import org.apache.activemq.wireformat.WireFormat; public class StompNIOTransport extends TcpTransport { private SocketChannel channel; + private SelectorSelection selection; public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -53,8 +61,47 @@ public class StompNIOTransport extends TcpTransport { channel = socket.getChannel(); channel.configureBlocking(false); - this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024)); - this.dataIn = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024)); + // listen for events telling us when the socket is readable. + selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { + public void onSelect(SelectorSelection selection) { + serviceRead(); + } + + public void onError(SelectorSelection selection, Throwable error) { + if (error instanceof IOException) { + onException((IOException)error); + } else { + onException(IOExceptionSupport.create(error)); + } + } + }); + + this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024)); } + private void serviceRead() { + try { + DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024)); + while (true) { + Object command = wireFormat.unmarshal(in); + doConsume((Command)command); + } + + } catch (IOException e) { + onException(e); + } catch (Throwable e) { + onException(IOExceptionSupport.create(e)); + } + } + + protected void doStart() throws Exception { + connect(); + selection.setInterestOps(SelectionKey.OP_READ); + selection.enable(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + selection.disable(); + super.doStop(stopper); + } }