This commit is contained in:
Dejan Bosanac 2016-02-25 12:07:41 +01:00
parent c2ad0c3251
commit 934a30a327
3 changed files with 53 additions and 50 deletions

View File

@ -92,7 +92,7 @@ public class NIOTransport extends TcpTransport {
// Send the data via the channel // Send the data via the channel
// inputBuffer = ByteBuffer.allocateDirect(8*1024); // inputBuffer = ByteBuffer.allocateDirect(8*1024);
inputBuffer = ByteBuffer.allocate(getIoBufferSize()); inputBuffer = ByteBuffer.allocateDirect(getIoBufferSize());
currentBuffer = inputBuffer; currentBuffer = inputBuffer;
nextFrameSize = -1; nextFrameSize = -1;
currentBuffer.limit(4); currentBuffer.limit(4);
@ -120,7 +120,6 @@ public class NIOTransport extends TcpTransport {
} }
this.receiveCounter += readSize; this.receiveCounter += readSize;
if (currentBuffer.hasRemaining()) { if (currentBuffer.hasRemaining()) {
continue; continue;
} }
@ -143,7 +142,7 @@ public class NIOTransport extends TcpTransport {
} }
if (nextFrameSize > inputBuffer.capacity()) { if (nextFrameSize > inputBuffer.capacity()) {
currentBuffer = ByteBuffer.allocate(nextFrameSize); currentBuffer = ByteBuffer.allocateDirect(nextFrameSize);
currentBuffer.putInt(nextFrameSize); currentBuffer.putInt(nextFrameSize);
} else { } else {
inputBuffer.limit(nextFrameSize); inputBuffer.limit(nextFrameSize);

View File

@ -19,12 +19,7 @@ package org.apache.activemq.transport.nio;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.spi.AbstractSelectableChannel; import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.Executor; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* The SelectorManager will manage one Selector and the thread that checks the * The SelectorManager will manage one Selector and the thread that checks the
@ -43,7 +38,7 @@ public final class SelectorManager {
private int maxChannelsPerWorker = 1024; private int maxChannelsPerWorker = 1024;
protected ExecutorService createDefaultExecutor() { protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultMaximumPoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() { new ThreadFactory() {
private long i = 0; private long i = 0;
@ -59,12 +54,8 @@ public final class SelectorManager {
return rc; return rc;
} }
private static int getDefaultCorePoolSize() {
return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 0);
}
private static int getDefaultMaximumPoolSize() { private static int getDefaultMaximumPoolSize() {
return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", Integer.MAX_VALUE); return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", 1024);
} }
private static int getDefaultKeepAliveTime() { private static int getDefaultKeepAliveTime() {

View File

@ -27,9 +27,12 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -67,7 +70,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
protected ServerSocket serverSocket; protected ServerSocket serverSocket;
protected SelectorSelection selector; protected Selector selector;
protected int backlog = 5000; protected int backlog = 5000;
protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
protected final TcpTransportFactory transportFactory; protected final TcpTransportFactory transportFactory;
@ -303,9 +306,20 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
if (chan != null) { if (chan != null) {
try { try {
chan.configureBlocking(false); chan.configureBlocking(false);
selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() { selector = Selector.open();
@Override chan.register(selector, SelectionKey.OP_ACCEPT);
public void onSelect(SelectorSelection sel) { while (!isStopped()) {
int count = selector.select(10);
if (count == 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
final SelectionKey key = i.next();
if (key.isAcceptable()) {
try { try {
SocketChannel sc = chan.accept(); SocketChannel sc = chan.accept();
if (sc != null) { if (sc != null) {
@ -319,18 +333,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
} }
} }
} }
} catch (SocketTimeoutException ste) {
// expect this to happen
} catch (Exception e) { } catch (Exception e) {
onError(sel, e); e.printStackTrace();
}
}
@Override
public void onError(SelectorSelection sel, Throwable error) {
Exception e = null;
if (error instanceof Exception) {
e = (Exception)error;
} else {
e = new Exception(error);
}
if (!isStopping()) { if (!isStopping()) {
onAcceptError(e); onAcceptError(e);
} else if (!isStopped()) { } else if (!isStopped()) {
@ -338,12 +345,19 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
onAcceptError(e); onAcceptError(e);
} }
} }
}); }
selector.setInterestOps(SelectionKey.OP_ACCEPT); i.remove();
selector.enable(); }
}
} catch (IOException ex) { } catch (IOException ex) {
if (selector != null) {
try {
selector.close();
} catch (IOException ioe) {}
selector = null; selector = null;
} }
}
} else { } else {
while (!isStopped()) { while (!isStopped()) {
Socket socket = null; Socket socket = null;
@ -459,7 +473,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
@Override @Override
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
if (selector != null) { if (selector != null) {
selector.disable();
selector.close(); selector.close();
selector = null; selector = null;
} }