git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@813722 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-09-11 08:47:11 +00:00
parent 673fee193d
commit 0e8eadb8fb
1 changed files with 49 additions and 2 deletions

View File

@ -22,14 +22,21 @@ import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOBufferedInputStream;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
/**
@ -40,6 +47,7 @@ import org.apache.activemq.wireformat.WireFormat;
public class StompNIOTransport extends TcpTransport {
private SocketChannel channel;
private SelectorSelection selection;
public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
@ -53,8 +61,47 @@ public class StompNIOTransport extends TcpTransport {
channel = socket.getChannel();
channel.configureBlocking(false);
this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
this.dataIn = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024));
// listen for events telling us when the socket is readable.
selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
public void onSelect(SelectorSelection selection) {
serviceRead();
}
public void onError(SelectorSelection selection, Throwable error) {
if (error instanceof IOException) {
onException((IOException)error);
} else {
onException(IOExceptionSupport.create(error));
}
}
});
this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
}
private void serviceRead() {
try {
DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024));
while (true) {
Object command = wireFormat.unmarshal(in);
doConsume((Command)command);
}
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
}
protected void doStart() throws Exception {
connect();
selection.setInterestOps(SelectionKey.OP_READ);
selection.enable();
}
protected void doStop(ServiceStopper stopper) throws Exception {
selection.disable();
super.doStop(stopper);
}
}