mirror of https://github.com/apache/activemq.git
Adding an intial cut of an NIO based Transport. This could be really hand to help brokers scale up in situations where it needs to accecpt a large number of connections.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@493241 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e30a9dddde
commit
8e8b82024b
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.BufferUnderflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
/**
|
||||
* An optimized buffered input stream for Tcp
|
||||
*
|
||||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
public class NIOInputStream extends InputStream {
|
||||
|
||||
protected int count;
|
||||
protected int position;
|
||||
private final ByteBuffer in;
|
||||
|
||||
public NIOInputStream(ByteBuffer in){
|
||||
this.in = in;
|
||||
}
|
||||
|
||||
public int read() throws IOException {
|
||||
try {
|
||||
int rc = in.get()& 0xff;
|
||||
return rc;
|
||||
} catch ( BufferUnderflowException e ) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
public int read(byte b[],int off,int len) throws IOException{
|
||||
if( in.hasRemaining() ) {
|
||||
int rc = Math.min(len, in.remaining());
|
||||
in.get(b, off, rc);
|
||||
return rc;
|
||||
} else {
|
||||
return len == 0 ? 0 : -1;
|
||||
}
|
||||
}
|
||||
|
||||
public long skip(long n) throws IOException{
|
||||
int rc = Math.min((int)n, in.remaining());
|
||||
in.position(in.position()+rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
public int available() throws IOException{
|
||||
return in.remaining();
|
||||
}
|
||||
|
||||
public boolean markSupported(){
|
||||
return false;
|
||||
}
|
||||
|
||||
public void close() throws IOException{
|
||||
}
|
||||
}
|
|
@ -0,0 +1,182 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
|
||||
/**
|
||||
* An optimized buffered outputstream for Tcp
|
||||
*
|
||||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
|
||||
public class NIOOutputStream extends OutputStream {
|
||||
|
||||
private final static int BUFFER_SIZE = 8192;
|
||||
|
||||
private final WritableByteChannel out;
|
||||
private final byte[] buffer;
|
||||
private final ByteBuffer byteBuffer;
|
||||
|
||||
private int count;
|
||||
private boolean closed;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param out
|
||||
*/
|
||||
public NIOOutputStream(WritableByteChannel out) {
|
||||
this(out, BUFFER_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new buffered output stream to write data to the specified underlying output stream with the specified
|
||||
* buffer size.
|
||||
*
|
||||
* @param out the underlying output stream.
|
||||
* @param size the buffer size.
|
||||
* @throws IllegalArgumentException if size <= 0.
|
||||
*/
|
||||
public NIOOutputStream(WritableByteChannel out, int size) {
|
||||
this.out = out;
|
||||
if (size <= 0) {
|
||||
throw new IllegalArgumentException("Buffer size <= 0");
|
||||
}
|
||||
buffer = new byte[size];
|
||||
byteBuffer = ByteBuffer.wrap(buffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* write a byte on to the stream
|
||||
*
|
||||
* @param b - byte to write
|
||||
* @throws IOException
|
||||
*/
|
||||
public void write(int b) throws IOException {
|
||||
checkClosed();
|
||||
if (availableBufferToWrite() < 1) {
|
||||
flush();
|
||||
}
|
||||
buffer[count++] = (byte) b;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* write a byte array to the stream
|
||||
*
|
||||
* @param b the byte buffer
|
||||
* @param off the offset into the buffer
|
||||
* @param len the length of data to write
|
||||
* @throws IOException
|
||||
*/
|
||||
public void write(byte b[], int off, int len) throws IOException {
|
||||
checkClosed();
|
||||
if (availableBufferToWrite() < len) {
|
||||
flush();
|
||||
}
|
||||
if (buffer.length >= len) {
|
||||
System.arraycopy(b, off, buffer, count, len);
|
||||
count += len;
|
||||
}
|
||||
else {
|
||||
write( ByteBuffer.wrap(b, off, len));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* flush the data to the output stream
|
||||
* This doesn't call flush on the underlying outputstream, because
|
||||
* Tcp is particularly efficent at doing this itself ....
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void flush() throws IOException {
|
||||
if (count > 0 && out != null) {
|
||||
byteBuffer.position(0);
|
||||
byteBuffer.limit(count);
|
||||
write(byteBuffer);
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* close this stream
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
closed = true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Checks that the stream has not been closed
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void checkClosed() throws IOException {
|
||||
if (closed) {
|
||||
throw new EOFException("Cannot write to the stream any more it has already been closed");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the amount free space in the buffer
|
||||
*/
|
||||
private int availableBufferToWrite() {
|
||||
return buffer.length - count;
|
||||
}
|
||||
|
||||
protected void write(ByteBuffer data) throws IOException {
|
||||
int remaining = data.remaining();
|
||||
int lastRemaining = remaining-1;
|
||||
long delay=1;
|
||||
while( remaining > 0 ) {
|
||||
|
||||
// We may need to do a little bit of sleeping to avoid a busy loop.
|
||||
// Slow down if no data was written out..
|
||||
if( remaining == lastRemaining ) {
|
||||
try {
|
||||
// Use exponential rollback to increase sleep time.
|
||||
Thread.sleep(delay);
|
||||
delay *= 2;
|
||||
if( delay > 1000 ) {
|
||||
delay = 1000;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
} else {
|
||||
delay = 1;
|
||||
}
|
||||
lastRemaining = remaining;
|
||||
|
||||
// Since the write is non-blocking, all the data may not have been written.
|
||||
out.write( data );
|
||||
remaining = data.remaining();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.tcp.TcpTransport;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
|
||||
/**
|
||||
* An implementation of the {@link Transport} interface using raw tcp/ip
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class NIOTransport extends TcpTransport {
|
||||
|
||||
//private static final Log log = LogFactory.getLog(NIOTransport.class);
|
||||
private SocketChannel channel;
|
||||
private SelectorSelection selection;
|
||||
private ByteBuffer inputBuffer;
|
||||
private ByteBuffer currentBuffer;
|
||||
private int nextFrameSize;
|
||||
|
||||
public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
|
||||
super(wireFormat, socketFactory, remoteLocation, localLocation);
|
||||
}
|
||||
|
||||
public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
|
||||
super(wireFormat, socket);
|
||||
}
|
||||
|
||||
protected void initializeStreams() throws IOException {
|
||||
channel = socket.getChannel();
|
||||
channel.configureBlocking(false);
|
||||
|
||||
// listen for events telling us when the socket is readable.
|
||||
selection = SelectorManager.getInstance().register(channel,
|
||||
new SelectorManager.Listener() {
|
||||
public void onSelect(SelectorSelection selection) {
|
||||
serviceRead();
|
||||
}
|
||||
public void onError(SelectorSelection selection, Throwable error) {
|
||||
if( error instanceof IOException ) {
|
||||
onException((IOException) error);
|
||||
} else {
|
||||
onException(IOExceptionSupport.create(error));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Send the data via the channel
|
||||
// inputBuffer = ByteBuffer.allocateDirect(8*1024);
|
||||
inputBuffer = ByteBuffer.allocate(8*1024);
|
||||
currentBuffer = inputBuffer;
|
||||
nextFrameSize=-1;
|
||||
currentBuffer.limit(4);
|
||||
this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024));
|
||||
|
||||
}
|
||||
|
||||
private void serviceRead() {
|
||||
try {
|
||||
while( true ) {
|
||||
|
||||
|
||||
int readSize = channel.read(currentBuffer);
|
||||
if( readSize == -1 ) {
|
||||
onException(new EOFException());
|
||||
selection.close();
|
||||
break;
|
||||
}
|
||||
if( readSize==0 ) {
|
||||
break;
|
||||
}
|
||||
|
||||
if( currentBuffer.hasRemaining() )
|
||||
continue;
|
||||
|
||||
// Are we trying to figure out the size of the next frame?
|
||||
if( nextFrameSize==-1 ) {
|
||||
assert inputBuffer == currentBuffer;
|
||||
|
||||
// If the frame is too big to fit in our direct byte buffer,
|
||||
// Then allocate a non direct byte buffer of the right size for it.
|
||||
inputBuffer.flip();
|
||||
nextFrameSize = inputBuffer.getInt()+4;
|
||||
if( nextFrameSize > inputBuffer.capacity() ) {
|
||||
currentBuffer = ByteBuffer.allocate(nextFrameSize);
|
||||
currentBuffer.putInt(nextFrameSize);
|
||||
} else {
|
||||
inputBuffer.limit(nextFrameSize);
|
||||
}
|
||||
|
||||
} else {
|
||||
currentBuffer.flip();
|
||||
|
||||
Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
|
||||
doConsume((Command) command);
|
||||
|
||||
nextFrameSize=-1;
|
||||
inputBuffer.clear();
|
||||
inputBuffer.limit(4);
|
||||
currentBuffer = inputBuffer;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
onException(e);
|
||||
} catch (Throwable e) {
|
||||
onException(IOExceptionSupport.create(e));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void doStart() throws Exception {
|
||||
connect();
|
||||
selection.setInterestOps(SelectionKey.OP_READ);
|
||||
selection.enable();
|
||||
}
|
||||
|
||||
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||
selection.disable();
|
||||
super.doStop(stopper);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,110 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
import javax.net.ServerSocketFactory;
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.tcp.TcpTransport;
|
||||
import org.apache.activemq.transport.tcp.TcpTransportFactory;
|
||||
import org.apache.activemq.transport.tcp.TcpTransportServer;
|
||||
|
||||
public class NIOTransportFactory extends TcpTransportFactory {
|
||||
|
||||
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
|
||||
return new TcpTransportServer(this, location, serverSocketFactory) {
|
||||
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
|
||||
return new NIOTransport(format,socket);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
|
||||
return new NIOTransport(wf, socketFactory, location, localLocation);
|
||||
}
|
||||
|
||||
|
||||
protected ServerSocketFactory createServerSocketFactory() {
|
||||
return new ServerSocketFactory() {
|
||||
public ServerSocket createServerSocket(int port) throws IOException {
|
||||
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
|
||||
serverSocketChannel.socket().bind(new InetSocketAddress(port));
|
||||
return serverSocketChannel.socket();
|
||||
}
|
||||
public ServerSocket createServerSocket(int port, int backlog) throws IOException {
|
||||
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
|
||||
serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
|
||||
return serverSocketChannel.socket();
|
||||
}
|
||||
public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
|
||||
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
|
||||
serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
|
||||
return serverSocketChannel.socket();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected SocketFactory createSocketFactory() {
|
||||
return new SocketFactory() {
|
||||
|
||||
public Socket createSocket() throws IOException {
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
return channel.socket();
|
||||
}
|
||||
|
||||
public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
channel.connect(new InetSocketAddress(host, port));
|
||||
return channel.socket();
|
||||
}
|
||||
|
||||
public Socket createSocket(InetAddress address, int port) throws IOException {
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
channel.connect(new InetSocketAddress(address, port));
|
||||
return channel.socket();
|
||||
}
|
||||
|
||||
public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
|
||||
channel.connect(new InetSocketAddress(address, port));
|
||||
return channel.socket();
|
||||
}
|
||||
|
||||
public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
|
||||
channel.connect(new InetSocketAddress(address, port));
|
||||
return channel.socket();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
/**
|
||||
* The SelectorManager will manage one Selector and the thread that checks the
|
||||
* selector.
|
||||
*
|
||||
* We may need to consider running more than one thread to check the selector if
|
||||
* servicing the selector takes too long.
|
||||
*
|
||||
* @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
|
||||
*/
|
||||
final public class SelectorManager {
|
||||
|
||||
static final public SelectorManager singleton = new SelectorManager();
|
||||
static SelectorManager getInstance() {
|
||||
return singleton;
|
||||
}
|
||||
|
||||
public interface Listener {
|
||||
public void onSelect(SelectorSelection selector);
|
||||
public void onError(SelectorSelection selection, Throwable error);
|
||||
}
|
||||
|
||||
private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory(){
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread rc = new Thread(r);
|
||||
rc.setName("NIO Transport Thread");
|
||||
return rc;
|
||||
}});
|
||||
private Executor channelExecutor = selectorExecutor;
|
||||
private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
|
||||
private int maxChannelsPerWorker = 64;
|
||||
|
||||
public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
|
||||
throws IOException {
|
||||
|
||||
SelectorWorker worker = null;
|
||||
if (freeWorkers.size() > 0) {
|
||||
worker = freeWorkers.getFirst();
|
||||
} else {
|
||||
worker = new SelectorWorker(this);
|
||||
freeWorkers.addFirst(worker);
|
||||
}
|
||||
|
||||
SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);
|
||||
return selection;
|
||||
}
|
||||
|
||||
synchronized void onWorkerFullEvent(SelectorWorker worker) {
|
||||
freeWorkers.remove(worker);
|
||||
}
|
||||
|
||||
synchronized public void onWorkerEmptyEvent(SelectorWorker worker) {
|
||||
freeWorkers.remove(worker);
|
||||
}
|
||||
|
||||
synchronized public void onWorkerNotFullEvent(SelectorWorker worker) {
|
||||
freeWorkers.add(worker);
|
||||
}
|
||||
|
||||
public Executor getChannelExecutor() {
|
||||
return channelExecutor;
|
||||
}
|
||||
|
||||
public void setChannelExecutor(Executor channelExecutor) {
|
||||
this.channelExecutor = channelExecutor;
|
||||
}
|
||||
|
||||
public int getMaxChannelsPerWorker() {
|
||||
return maxChannelsPerWorker;
|
||||
}
|
||||
|
||||
public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
|
||||
this.maxChannelsPerWorker = maxChannelsPerWorker;
|
||||
}
|
||||
|
||||
public Executor getSelectorExecutor() {
|
||||
return selectorExecutor;
|
||||
}
|
||||
|
||||
public void setSelectorExecutor(Executor selectorExecutor) {
|
||||
this.selectorExecutor = selectorExecutor;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
import org.apache.activemq.transport.nio.SelectorManager.Listener;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author chirino
|
||||
*/
|
||||
final public class SelectorSelection {
|
||||
|
||||
private final SelectorWorker worker;
|
||||
private final SelectionKey key;
|
||||
private final Listener listener;
|
||||
private int interest;
|
||||
|
||||
|
||||
public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
|
||||
this.worker = worker;
|
||||
this.listener = listener;
|
||||
this.key = socketChannel.register(worker.selector, 0, this);
|
||||
worker.incrementUseCounter();
|
||||
}
|
||||
|
||||
public void setInterestOps(int ops) {
|
||||
interest = ops;
|
||||
}
|
||||
|
||||
public void enable() {
|
||||
key.interestOps(interest);
|
||||
worker.selector.wakeup();
|
||||
}
|
||||
|
||||
public void disable() {
|
||||
key.interestOps(0);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
worker.decrementUseCounter();
|
||||
key.cancel();
|
||||
worker.selector.wakeup();
|
||||
}
|
||||
|
||||
public void onSelect() {
|
||||
listener.onSelect(this);
|
||||
}
|
||||
|
||||
public void onError(Throwable e) {
|
||||
listener.onError(this, e);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,130 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
||||
public class SelectorWorker implements Runnable {
|
||||
|
||||
private final static AtomicInteger NEXT_ID = new AtomicInteger();
|
||||
|
||||
final SelectorManager manager;
|
||||
final Selector selector;
|
||||
final int id = NEXT_ID.getAndIncrement();
|
||||
final AtomicInteger useCounter = new AtomicInteger();
|
||||
final private int maxChannelsPerWorker;
|
||||
|
||||
|
||||
public SelectorWorker(SelectorManager manager) throws IOException {
|
||||
this.manager = manager;
|
||||
selector = Selector.open();
|
||||
maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
|
||||
}
|
||||
|
||||
void incrementUseCounter() {
|
||||
int use = useCounter.getAndIncrement();
|
||||
if( use == 0 ) {
|
||||
manager.getSelectorExecutor().execute(this);
|
||||
} else if( use+1 == maxChannelsPerWorker ) {
|
||||
manager.onWorkerFullEvent(this);
|
||||
}
|
||||
}
|
||||
|
||||
void decrementUseCounter() {
|
||||
int use = useCounter.getAndDecrement();
|
||||
if (use == 1) {
|
||||
manager.onWorkerEmptyEvent(this);
|
||||
} else if (use == maxChannelsPerWorker ) {
|
||||
manager.onWorkerNotFullEvent(this);
|
||||
}
|
||||
}
|
||||
|
||||
boolean isRunning() {
|
||||
return useCounter.get()!=0;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
|
||||
String origName = Thread.currentThread().getName();
|
||||
try {
|
||||
Thread.currentThread().setName("Selector Worker: " + id);
|
||||
while (isRunning()) {
|
||||
|
||||
int count = selector.select(10);
|
||||
if (count == 0)
|
||||
continue;
|
||||
|
||||
if (!isRunning())
|
||||
return;
|
||||
|
||||
// Get a java.util.Set containing the SelectionKey objects
|
||||
// for all channels that are ready for I/O.
|
||||
Set keys = selector.selectedKeys();
|
||||
|
||||
for (Iterator i = keys.iterator(); i.hasNext();) {
|
||||
final SelectionKey key = (SelectionKey) i.next();
|
||||
i.remove();
|
||||
|
||||
final SelectorSelection s = (SelectorSelection) key.attachment();
|
||||
try {
|
||||
s.disable();
|
||||
|
||||
// Kick off another thread to find newly selected keys while we process the
|
||||
// currently selected keys
|
||||
manager.getChannelExecutor().execute(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
s.onSelect();
|
||||
s.enable();
|
||||
} catch (Throwable e) {
|
||||
s.onError(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
} catch ( Throwable e ) {
|
||||
s.onError(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
||||
// Don't accept any more slections
|
||||
manager.onWorkerEmptyEvent(this);
|
||||
|
||||
// Notify all the selections that the error occurred.
|
||||
Set keys = selector.keys();
|
||||
for (Iterator i = keys.iterator(); i.hasNext();) {
|
||||
SelectionKey key = (SelectionKey) i.next();
|
||||
SelectorSelection s = (SelectorSelection) key.attachment();
|
||||
s.onError(e);
|
||||
}
|
||||
|
||||
} finally {
|
||||
Thread.currentThread().setName(origName);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
class=org.apache.activemq.transport.nio.NIOTransportFactory
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import org.apache.activemq.JmsDurableTopicSendReceiveTest;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
||||
public class NIOJmsDurableTopicSendReceiveTest extends JmsDurableTopicSendReceiveTest {
|
||||
protected BrokerService broker;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
if (broker == null) {
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
}
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected ActiveMQConnectionFactory createConnectionFactory() {
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL());
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
protected String getBrokerURL() {
|
||||
return "nio://localhost:61616";
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
answer.setPersistent(false);
|
||||
answer.addConnector(getBrokerURL());
|
||||
return answer;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class NIOJmsSendAndReceiveTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
|
||||
protected BrokerService broker;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
if (broker == null) {
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
}
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected ActiveMQConnectionFactory createConnectionFactory() {
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL());
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
protected String getBrokerURL() {
|
||||
return "nio://localhost:61616";
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
answer.setPersistent(false);
|
||||
answer.addConnector(getBrokerURL());
|
||||
return answer;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
||||
import javax.jms.DeliveryMode;
|
||||
|
||||
public class NIOPersistentSendAndReceiveTest extends NIOJmsSendAndReceiveTest {
|
||||
protected BrokerService broker;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
this.topic = false;
|
||||
this.deliveryMode = DeliveryMode.PERSISTENT;
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
answer.setPersistent(true);
|
||||
answer.addConnector(getBrokerURL());
|
||||
return answer;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import org.apache.activemq.transport.TransportBrokerTestSupport;
|
||||
|
||||
import junit.framework.Test;
|
||||
import junit.textui.TestRunner;
|
||||
|
||||
public class NIOTransportBrokerTest extends TransportBrokerTestSupport {
|
||||
|
||||
protected String getBindLocation() {
|
||||
return "nio://localhost:61616";
|
||||
}
|
||||
|
||||
public static Test suite() {
|
||||
return suite(NIOTransportBrokerTest.class);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
TestRunner.run(suite());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue