diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java index 2f1229d636..4ed7086427 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java @@ -22,14 +22,21 @@ import java.io.IOException; import java.net.Socket; import java.net.URI; import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import javax.net.SocketFactory; +import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.nio.NIOBufferedInputStream; import org.apache.activemq.transport.nio.NIOOutputStream; +import org.apache.activemq.transport.nio.SelectorManager; +import org.apache.activemq.transport.nio.SelectorSelection; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; /** @@ -40,6 +47,7 @@ import org.apache.activemq.wireformat.WireFormat; public class StompNIOTransport extends TcpTransport { private SocketChannel channel; + private SelectorSelection selection; public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -53,8 +61,47 @@ public class StompNIOTransport extends TcpTransport { channel = socket.getChannel(); channel.configureBlocking(false); - this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024)); - this.dataIn = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024)); + // listen for events telling us when the socket is readable. + selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { + public void onSelect(SelectorSelection selection) { + serviceRead(); + } + + public void onError(SelectorSelection selection, Throwable error) { + if (error instanceof IOException) { + onException((IOException)error); + } else { + onException(IOExceptionSupport.create(error)); + } + } + }); + + this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024)); } + private void serviceRead() { + try { + DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024)); + while (true) { + Object command = wireFormat.unmarshal(in); + doConsume((Command)command); + } + + } catch (IOException e) { + onException(e); + } catch (Throwable e) { + onException(IOExceptionSupport.create(e)); + } + } + + protected void doStart() throws Exception { + connect(); + selection.setInterestOps(SelectionKey.OP_READ); + selection.enable(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + selection.disable(); + super.doStop(stopper); + } }