https://issues.apache.org/activemq/browse/AMQ-2440 - first shot at refactoring stomp+nio - handles load much better than previous solution, but seems to still leak threads - future improvements expected

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@897939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-01-11 16:54:49 +00:00
parent 8141baacef
commit a9e7e94054
3 changed files with 61 additions and 9 deletions

View File

@ -79,6 +79,11 @@ public final class SelectorManager {
public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
freeWorkers.remove(worker);
try {
// no more connections on worker, close it
worker.close();
} catch (IOException e) {
}
}
public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {

View File

@ -117,7 +117,7 @@ public class SelectorWorker implements Runnable {
}
} catch (IOException e) {
// Don't accept any more slections
// Don't accept any more selections
manager.onWorkerEmptyEvent(this);
// Notify all the selections that the error occurred.
@ -147,4 +147,7 @@ public class SelectorWorker implements Runnable {
selectorLock.readLock().unlock();
}
public void close() throws IOException {
selector.close();
}
}

View File

@ -16,8 +16,10 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
@ -30,11 +32,13 @@ import javax.net.SocketFactory;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOBufferedInputStream;
import org.apache.activemq.transport.nio.NIOInputStream;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
@ -49,6 +53,10 @@ public class StompNIOTransport extends TcpTransport {
private SocketChannel channel;
private SelectorSelection selection;
private ByteBuffer inputBuffer;
ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
int previousByte = -1;
public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
@ -76,18 +84,54 @@ public class StompNIOTransport extends TcpTransport {
}
});
inputBuffer = ByteBuffer.allocate(8 * 1024);
this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
}
private void serviceRead() {
try {
DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024));
while (true) {
Object command = wireFormat.unmarshal(in);
doConsume((Command)command);
}
while (true) {
// read channel
int readSize = channel.read(inputBuffer);
// channel is closed, cleanup
if (readSize == -1) {
onException(new EOFException());
selection.close();
break;
}
// nothing more to read, break
if (readSize == 0) {
break;
}
inputBuffer.flip();
int b;
ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
int i = 0;
while(i++ < readSize) {
b = input.read();
// skip repeating nulls
if (previousByte == 0 && b == 0) {
continue;
}
currentCommand.write(b);
// end of command reached, unmarshal
if (b == 0) {
Object command = wireFormat.unmarshal(new ByteSequence(currentCommand.toByteArray()));
doConsume((Command)command);
currentCommand.reset();
}
previousByte = b;
}
// clear the buffer
inputBuffer.clear();
}
} catch (IOException e) {
selection.close();
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));