From 8e8b82024b06da33a4258be700054e17922d6abc Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 5 Jan 2007 23:23:14 +0000 Subject: [PATCH] Adding an intial cut of an NIO based Transport. This could be really hand to help brokers scale up in situations where it needs to accecpt a large number of connections. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@493241 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/nio/NIOInputStream.java | 74 +++++++ .../transport/nio/NIOOutputStream.java | 182 ++++++++++++++++++ .../activemq/transport/nio/NIOTransport.java | 156 +++++++++++++++ .../transport/nio/NIOTransportFactory.java | 110 +++++++++++ .../transport/nio/SelectorManager.java | 109 +++++++++++ .../transport/nio/SelectorSelection.java | 72 +++++++ .../transport/nio/SelectorWorker.java | 130 +++++++++++++ .../org/apache/activemq/transport/nio | 1 + .../NIOJmsDurableTopicSendReceiveTest.java | 57 ++++++ .../nio/NIOJmsSendAndReceiveTest.java | 60 ++++++ .../nio/NIOPersistentSendAndReceiveTest.java | 39 ++++ .../transport/nio/NIOTransportBrokerTest.java | 39 ++++ 12 files changed, 1029 insertions(+) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java create mode 100644 activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/nio create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOPersistentSendAndReceiveTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java new file mode 100644 index 0000000000..6d4744107c --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOInputStream.java @@ -0,0 +1,74 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +/** + * An optimized buffered input stream for Tcp + * + * @version $Revision: 1.1.1.1 $ + */ +public class NIOInputStream extends InputStream { + + protected int count; + protected int position; + private final ByteBuffer in; + + public NIOInputStream(ByteBuffer in){ + this.in = in; + } + + public int read() throws IOException { + try { + int rc = in.get()& 0xff; + return rc; + } catch ( BufferUnderflowException e ) { + return -1; + } + } + + public int read(byte b[],int off,int len) throws IOException{ + if( in.hasRemaining() ) { + int rc = Math.min(len, in.remaining()); + in.get(b, off, rc); + return rc; + } else { + return len == 0 ? 0 : -1; + } + } + + public long skip(long n) throws IOException{ + int rc = Math.min((int)n, in.remaining()); + in.position(in.position()+rc); + return rc; + } + + public int available() throws IOException{ + return in.remaining(); + } + + public boolean markSupported(){ + return false; + } + + public void close() throws IOException{ + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java new file mode 100644 index 0000000000..e36049082e --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java @@ -0,0 +1,182 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +/** + * An optimized buffered outputstream for Tcp + * + * @version $Revision: 1.1.1.1 $ + */ + +public class NIOOutputStream extends OutputStream { + + private final static int BUFFER_SIZE = 8192; + + private final WritableByteChannel out; + private final byte[] buffer; + private final ByteBuffer byteBuffer; + + private int count; + private boolean closed; + + /** + * Constructor + * + * @param out + */ + public NIOOutputStream(WritableByteChannel out) { + this(out, BUFFER_SIZE); + } + + /** + * Creates a new buffered output stream to write data to the specified underlying output stream with the specified + * buffer size. + * + * @param out the underlying output stream. + * @param size the buffer size. + * @throws IllegalArgumentException if size <= 0. + */ + public NIOOutputStream(WritableByteChannel out, int size) { + this.out = out; + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + buffer = new byte[size]; + byteBuffer = ByteBuffer.wrap(buffer); + } + + /** + * write a byte on to the stream + * + * @param b - byte to write + * @throws IOException + */ + public void write(int b) throws IOException { + checkClosed(); + if (availableBufferToWrite() < 1) { + flush(); + } + buffer[count++] = (byte) b; + } + + + /** + * write a byte array to the stream + * + * @param b the byte buffer + * @param off the offset into the buffer + * @param len the length of data to write + * @throws IOException + */ + public void write(byte b[], int off, int len) throws IOException { + checkClosed(); + if (availableBufferToWrite() < len) { + flush(); + } + if (buffer.length >= len) { + System.arraycopy(b, off, buffer, count, len); + count += len; + } + else { + write( ByteBuffer.wrap(b, off, len)); + } + } + + /** + * flush the data to the output stream + * This doesn't call flush on the underlying outputstream, because + * Tcp is particularly efficent at doing this itself .... + * + * @throws IOException + */ + public void flush() throws IOException { + if (count > 0 && out != null) { + byteBuffer.position(0); + byteBuffer.limit(count); + write(byteBuffer); + count = 0; + } + } + + /** + * close this stream + * + * @throws IOException + */ + public void close() throws IOException { + super.close(); + closed = true; + } + + + /** + * Checks that the stream has not been closed + * + * @throws IOException + */ + protected void checkClosed() throws IOException { + if (closed) { + throw new EOFException("Cannot write to the stream any more it has already been closed"); + } + } + + /** + * @return the amount free space in the buffer + */ + private int availableBufferToWrite() { + return buffer.length - count; + } + + protected void write(ByteBuffer data) throws IOException { + int remaining = data.remaining(); + int lastRemaining = remaining-1; + long delay=1; + while( remaining > 0 ) { + + // We may need to do a little bit of sleeping to avoid a busy loop. + // Slow down if no data was written out.. + if( remaining == lastRemaining ) { + try { + // Use exponential rollback to increase sleep time. + Thread.sleep(delay); + delay *= 2; + if( delay > 1000 ) { + delay = 1000; + } + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } else { + delay = 1; + } + lastRemaining = remaining; + + // Since the write is non-blocking, all the data may not have been written. + out.write( data ); + remaining = data.remaining(); + } + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java new file mode 100644 index 0000000000..615b70a8ee --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java @@ -0,0 +1,156 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import javax.net.SocketFactory; + +import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; + +/** + * An implementation of the {@link Transport} interface using raw tcp/ip + * + * @version $Revision$ + */ +public class NIOTransport extends TcpTransport { + + //private static final Log log = LogFactory.getLog(NIOTransport.class); + private SocketChannel channel; + private SelectorSelection selection; + private ByteBuffer inputBuffer; + private ByteBuffer currentBuffer; + private int nextFrameSize; + + public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { + super(wireFormat, socketFactory, remoteLocation, localLocation); + } + + public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException { + super(wireFormat, socket); + } + + protected void initializeStreams() throws IOException { + channel = socket.getChannel(); + channel.configureBlocking(false); + + // listen for events telling us when the socket is readable. + selection = SelectorManager.getInstance().register(channel, + new SelectorManager.Listener() { + public void onSelect(SelectorSelection selection) { + serviceRead(); + } + public void onError(SelectorSelection selection, Throwable error) { + if( error instanceof IOException ) { + onException((IOException) error); + } else { + onException(IOExceptionSupport.create(error)); + } + } + }); + + // Send the data via the channel +// inputBuffer = ByteBuffer.allocateDirect(8*1024); + inputBuffer = ByteBuffer.allocate(8*1024); + currentBuffer = inputBuffer; + nextFrameSize=-1; + currentBuffer.limit(4); + this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024)); + + } + + private void serviceRead() { + try { + while( true ) { + + + int readSize = channel.read(currentBuffer); + if( readSize == -1 ) { + onException(new EOFException()); + selection.close(); + break; + } + if( readSize==0 ) { + break; + } + + if( currentBuffer.hasRemaining() ) + continue; + + // Are we trying to figure out the size of the next frame? + if( nextFrameSize==-1 ) { + assert inputBuffer == currentBuffer; + + // If the frame is too big to fit in our direct byte buffer, + // Then allocate a non direct byte buffer of the right size for it. + inputBuffer.flip(); + nextFrameSize = inputBuffer.getInt()+4; + if( nextFrameSize > inputBuffer.capacity() ) { + currentBuffer = ByteBuffer.allocate(nextFrameSize); + currentBuffer.putInt(nextFrameSize); + } else { + inputBuffer.limit(nextFrameSize); + } + + } else { + currentBuffer.flip(); + + Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); + doConsume((Command) command); + + nextFrameSize=-1; + inputBuffer.clear(); + inputBuffer.limit(4); + currentBuffer = inputBuffer; + } + + } + + } catch (IOException e) { + onException(e); + } catch (Throwable e) { + onException(IOExceptionSupport.create(e)); + } + } + + + protected void doStart() throws Exception { + connect(); + selection.setInterestOps(SelectionKey.OP_READ); + selection.enable(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + selection.disable(); + super.doStop(stopper); + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java new file mode 100644 index 0000000000..c7a02028e9 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java @@ -0,0 +1,110 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; + +import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportServer; + +public class NIOTransportFactory extends TcpTransportFactory { + + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpTransportServer(this, location, serverSocketFactory) { + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + return new NIOTransport(format,socket); + } + }; + } + + protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new NIOTransport(wf, socketFactory, location, localLocation); + } + + + protected ServerSocketFactory createServerSocketFactory() { + return new ServerSocketFactory() { + public ServerSocket createServerSocket(int port) throws IOException { + ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.socket().bind(new InetSocketAddress(port)); + return serverSocketChannel.socket(); + } + public ServerSocket createServerSocket(int port, int backlog) throws IOException { + ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog); + return serverSocketChannel.socket(); + } + public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException { + ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog); + return serverSocketChannel.socket(); + } + }; + } + + protected SocketFactory createSocketFactory() { + return new SocketFactory() { + + public Socket createSocket() throws IOException { + SocketChannel channel = SocketChannel.open(); + return channel.socket(); + } + + public Socket createSocket(String host, int port) throws IOException, UnknownHostException { + SocketChannel channel = SocketChannel.open(); + channel.connect(new InetSocketAddress(host, port)); + return channel.socket(); + } + + public Socket createSocket(InetAddress address, int port) throws IOException { + SocketChannel channel = SocketChannel.open(); + channel.connect(new InetSocketAddress(address, port)); + return channel.socket(); + } + + public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException { + SocketChannel channel = SocketChannel.open(); + channel.socket().bind(new InetSocketAddress(localAddresss, localPort)); + channel.connect(new InetSocketAddress(address, port)); + return channel.socket(); + } + + public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException { + SocketChannel channel = SocketChannel.open(); + channel.socket().bind(new InetSocketAddress(localAddresss, localPort)); + channel.connect(new InetSocketAddress(address, port)); + return channel.socket(); + } + }; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java new file mode 100644 index 0000000000..35cffe269f --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import java.io.IOException; +import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +/** + * The SelectorManager will manage one Selector and the thread that checks the + * selector. + * + * We may need to consider running more than one thread to check the selector if + * servicing the selector takes too long. + * + * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $ + */ +final public class SelectorManager { + + static final public SelectorManager singleton = new SelectorManager(); + static SelectorManager getInstance() { + return singleton; + } + + public interface Listener { + public void onSelect(SelectorSelection selector); + public void onError(SelectorSelection selection, Throwable error); + } + + private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory(){ + public Thread newThread(Runnable r) { + Thread rc = new Thread(r); + rc.setName("NIO Transport Thread"); + return rc; + }}); + private Executor channelExecutor = selectorExecutor; + private LinkedList freeWorkers = new LinkedList(); + private int maxChannelsPerWorker = 64; + + public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) + throws IOException { + + SelectorWorker worker = null; + if (freeWorkers.size() > 0) { + worker = freeWorkers.getFirst(); + } else { + worker = new SelectorWorker(this); + freeWorkers.addFirst(worker); + } + + SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener); + return selection; + } + + synchronized void onWorkerFullEvent(SelectorWorker worker) { + freeWorkers.remove(worker); + } + + synchronized public void onWorkerEmptyEvent(SelectorWorker worker) { + freeWorkers.remove(worker); + } + + synchronized public void onWorkerNotFullEvent(SelectorWorker worker) { + freeWorkers.add(worker); + } + + public Executor getChannelExecutor() { + return channelExecutor; + } + + public void setChannelExecutor(Executor channelExecutor) { + this.channelExecutor = channelExecutor; + } + + public int getMaxChannelsPerWorker() { + return maxChannelsPerWorker; + } + + public void setMaxChannelsPerWorker(int maxChannelsPerWorker) { + this.maxChannelsPerWorker = maxChannelsPerWorker; + } + + public Executor getSelectorExecutor() { + return selectorExecutor; + } + + public void setSelectorExecutor(Executor selectorExecutor) { + this.selectorExecutor = selectorExecutor; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java new file mode 100644 index 0000000000..a3891c8397 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java @@ -0,0 +1,72 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import org.apache.activemq.transport.nio.SelectorManager.Listener; + +/** + * + * @author chirino + */ +final public class SelectorSelection { + + private final SelectorWorker worker; + private final SelectionKey key; + private final Listener listener; + private int interest; + + + public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException { + this.worker = worker; + this.listener = listener; + this.key = socketChannel.register(worker.selector, 0, this); + worker.incrementUseCounter(); + } + + public void setInterestOps(int ops) { + interest = ops; + } + + public void enable() { + key.interestOps(interest); + worker.selector.wakeup(); + } + + public void disable() { + key.interestOps(0); + } + + public void close() { + worker.decrementUseCounter(); + key.cancel(); + worker.selector.wakeup(); + } + + public void onSelect() { + listener.onSelect(this); + } + + public void onError(Throwable e) { + listener.onError(this, e); + } + +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java new file mode 100644 index 0000000000..f24c266b5a --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + + +public class SelectorWorker implements Runnable { + + private final static AtomicInteger NEXT_ID = new AtomicInteger(); + + final SelectorManager manager; + final Selector selector; + final int id = NEXT_ID.getAndIncrement(); + final AtomicInteger useCounter = new AtomicInteger(); + final private int maxChannelsPerWorker; + + + public SelectorWorker(SelectorManager manager) throws IOException { + this.manager = manager; + selector = Selector.open(); + maxChannelsPerWorker = manager.getMaxChannelsPerWorker(); + } + + void incrementUseCounter() { + int use = useCounter.getAndIncrement(); + if( use == 0 ) { + manager.getSelectorExecutor().execute(this); + } else if( use+1 == maxChannelsPerWorker ) { + manager.onWorkerFullEvent(this); + } + } + + void decrementUseCounter() { + int use = useCounter.getAndDecrement(); + if (use == 1) { + manager.onWorkerEmptyEvent(this); + } else if (use == maxChannelsPerWorker ) { + manager.onWorkerNotFullEvent(this); + } + } + + boolean isRunning() { + return useCounter.get()!=0; + } + + public void run() { + + String origName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("Selector Worker: " + id); + while (isRunning()) { + + int count = selector.select(10); + if (count == 0) + continue; + + if (!isRunning()) + return; + + // Get a java.util.Set containing the SelectionKey objects + // for all channels that are ready for I/O. + Set keys = selector.selectedKeys(); + + for (Iterator i = keys.iterator(); i.hasNext();) { + final SelectionKey key = (SelectionKey) i.next(); + i.remove(); + + final SelectorSelection s = (SelectorSelection) key.attachment(); + try { + s.disable(); + + // Kick off another thread to find newly selected keys while we process the + // currently selected keys + manager.getChannelExecutor().execute(new Runnable() { + public void run() { + try { + s.onSelect(); + s.enable(); + } catch (Throwable e) { + s.onError(e); + } + } + }); + + } catch ( Throwable e ) { + s.onError(e); + } + + } + + } + } catch (IOException e) { + + // Don't accept any more slections + manager.onWorkerEmptyEvent(this); + + // Notify all the selections that the error occurred. + Set keys = selector.keys(); + for (Iterator i = keys.iterator(); i.hasNext();) { + SelectionKey key = (SelectionKey) i.next(); + SelectorSelection s = (SelectorSelection) key.attachment(); + s.onError(e); + } + + } finally { + Thread.currentThread().setName(origName); + } + } +} \ No newline at end of file diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/nio b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/nio new file mode 100644 index 0000000000..c566449e6d --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/nio @@ -0,0 +1 @@ +class=org.apache.activemq.transport.nio.NIOTransportFactory diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java new file mode 100644 index 0000000000..b0fc3ccfaf --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java @@ -0,0 +1,57 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import org.apache.activemq.JmsDurableTopicSendReceiveTest; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; + +public class NIOJmsDurableTopicSendReceiveTest extends JmsDurableTopicSendReceiveTest { + protected BrokerService broker; + + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + broker.start(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL()); + return connectionFactory; + } + + protected String getBrokerURL() { + return "nio://localhost:61616"; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.addConnector(getBrokerURL()); + return answer; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java new file mode 100644 index 0000000000..e33795fec4 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java @@ -0,0 +1,60 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; + +/** + * @version $Revision$ + */ +public class NIOJmsSendAndReceiveTest extends JmsTopicSendReceiveWithTwoConnectionsTest { + protected BrokerService broker; + + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + broker.start(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL()); + return connectionFactory; + } + + protected String getBrokerURL() { + return "nio://localhost:61616"; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.addConnector(getBrokerURL()); + return answer; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOPersistentSendAndReceiveTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOPersistentSendAndReceiveTest.java new file mode 100644 index 0000000000..cf989330bf --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOPersistentSendAndReceiveTest.java @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import org.apache.activemq.broker.BrokerService; + +import javax.jms.DeliveryMode; + +public class NIOPersistentSendAndReceiveTest extends NIOJmsSendAndReceiveTest { + protected BrokerService broker; + + protected void setUp() throws Exception { + this.topic = false; + this.deliveryMode = DeliveryMode.PERSISTENT; + super.setUp(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(true); + answer.addConnector(getBrokerURL()); + return answer; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java new file mode 100644 index 0000000000..6b09eb0dc2 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import org.apache.activemq.transport.TransportBrokerTestSupport; + +import junit.framework.Test; +import junit.textui.TestRunner; + +public class NIOTransportBrokerTest extends TransportBrokerTestSupport { + + protected String getBindLocation() { + return "nio://localhost:61616"; + } + + public static Test suite() { + return suite(NIOTransportBrokerTest.class); + } + + public static void main(String[] args) { + TestRunner.run(suite()); + } + +}