merging 897262,897939,898774,916762,916780,920325,920330,920827,920838,920881 - https://issues.apache.org/activemq/browse/AMQ-2440 - stomp+nio

git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@921318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-03-10 11:48:08 +00:00
parent b8ddf93692
commit 21fc45e3d9
11 changed files with 353 additions and 127 deletions

View File

@ -77,6 +77,7 @@ import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -91,6 +92,8 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.activemq.thread.DefaultThreadPools.*;
/**
* @version $Revision: 1.8 $
*/
@ -908,8 +911,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
cs.getContext().getStopping().set(true);
}
try {
new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) {
@Override
getDefaultTaskRunnerFactory().execute(new Runnable(){
public void run() {
serviceLock.writeLock().lock();
try {
@ -922,7 +924,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
serviceLock.writeLock().unlock();
}
}
}.start();
});
} catch (Throwable t) {
LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
stopped.countDown();

View File

@ -21,6 +21,7 @@ import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
@ -32,6 +33,9 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.activemq.thread.DefaultThreadPools.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -202,9 +206,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
// Starting the connection could block due to
// wireformat negotiation, so start it in an async thread.
Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress()) {
getDefaultTaskRunnerFactory().execute(new Runnable(){
public void run() {
try {
Connection connection = createConnection(transport);
@ -214,8 +216,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
onAcceptError(e);
}
}
};
startThread.start();
});
} catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
ServiceSupport.dispose(transport);

View File

@ -26,24 +26,24 @@ import java.util.concurrent.ThreadFactory;
*/
public final class DefaultThreadPools {
private static final Executor DEFAULT_POOL;
static {
DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
thread.setDaemon(true);
return thread;
}
});
}
// private static final Executor DEFAULT_POOL;
// static {
// DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
// public Thread newThread(Runnable runnable) {
// Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
// thread.setDaemon(true);
// return thread;
// }
// });
// }
private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
private DefaultThreadPools() {
}
public static Executor getDefaultPool() {
return DEFAULT_POOL;
}
// public static Executor getDefaultPool() {
// return DEFAULT_POOL;
// }
public static TaskRunnerFactory getDefaultTaskRunnerFactory() {
return DEFAULT_TASK_RUNNER_FACTORY;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.thread;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -31,7 +32,7 @@ import java.util.concurrent.TimeUnit;
*
* @version $Revision: 1.5 $
*/
public class TaskRunnerFactory {
public class TaskRunnerFactory implements Executor {
private ExecutorService executor;
private int maxIterationsPerRun;
@ -80,6 +81,18 @@ public class TaskRunnerFactory {
}
}
public void execute(Runnable runnable) {
execute(runnable, "ActiveMQ Task");
}
public void execute(Runnable runnable, String name) {
if (executor != null) {
executor.execute(runnable);
} else {
new Thread(runnable, name).start();
}
}
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {

View File

@ -149,7 +149,7 @@ public class NIOTransport extends TcpTransport {
}
protected void doStop(ServiceStopper stopper) throws Exception {
selection.disable();
selection.close();
super.doStop(stopper);
}
}

View File

@ -20,8 +20,11 @@ import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* The SelectorManager will manage one Selector and the thread that checks the
@ -36,16 +39,20 @@ public final class SelectorManager {
public static final SelectorManager SINGLETON = new SelectorManager();
private Executor selectorExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread rc = new Thread(r);
rc.setName("NIO Transport Thread");
return rc;
}
});
private Executor selectorExecutor = createDefaultExecutor();
private Executor channelExecutor = selectorExecutor;
private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
private int maxChannelsPerWorker = 64;
private int maxChannelsPerWorker = 1024;
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "ActiveMQ NIO Worker");
}
});
// rc.allowCoreThreadTimeOut(true);
return rc;
}
public static SelectorManager getInstance() {
return SINGLETON;
@ -61,15 +68,25 @@ public final class SelectorManager {
public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
throws IOException {
SelectorWorker worker = null;
if (freeWorkers.size() > 0) {
worker = freeWorkers.getFirst();
} else {
worker = new SelectorWorker(this);
freeWorkers.addFirst(worker);
SelectorSelection selection = null;
while( selection == null ) {
if (freeWorkers.size() > 0) {
SelectorWorker worker = freeWorkers.getFirst();
if( worker.isReleased() ) {
freeWorkers.remove(worker);
} else {
worker.retain();
selection = new SelectorSelection(worker, socketChannel, listener);
}
} else {
// Worker starts /w retain count of 1
SelectorWorker worker = new SelectorWorker(this);
freeWorkers.addFirst(worker);
selection = new SelectorSelection(worker, socketChannel, listener);
}
}
SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);
return selection;
}
@ -82,7 +99,7 @@ public final class SelectorManager {
}
public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
freeWorkers.add(worker);
freeWorkers.addFirst(worker);
}
public Executor getChannelExecutor() {

View File

@ -16,9 +16,11 @@
*/
package org.apache.activemq.transport.nio;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.nio.SelectorManager.Listener;
@ -28,23 +30,23 @@ import org.apache.activemq.transport.nio.SelectorManager.Listener;
public final class SelectorSelection {
private final SelectorWorker worker;
private final SelectionKey key;
private final Listener listener;
private int interest;
private SelectionKey key;
private AtomicBoolean closed = new AtomicBoolean();
public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
this.worker = worker;
this.listener = listener;
// Lock when mutating state of the selector
worker.lock();
try {
this.key = socketChannel.register(worker.selector, 0, this);
worker.incrementUseCounter();
} finally {
worker.unlock();
}
worker.addIoTask(new Runnable() {
public void run() {
try {
SelectorSelection.this.key = socketChannel.register(worker.selector, 0, SelectorSelection.this);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void setInterestOps(int ops) {
@ -52,25 +54,39 @@ public final class SelectorSelection {
}
public void enable() {
key.interestOps(interest);
worker.selector.wakeup();
worker.addIoTask(new Runnable() {
public void run() {
try {
key.interestOps(interest);
} catch (CancelledKeyException e) {
}
}
});
}
public void disable() {
if (key.isValid()) {
key.interestOps(0);
}
worker.addIoTask(new Runnable() {
public void run() {
try {
key.interestOps(0);
} catch (CancelledKeyException e) {
}
}
});
}
public void close() {
worker.decrementUseCounter();
// Lock when mutating state of the selector
worker.lock();
try {
key.cancel();
} finally {
worker.unlock();
// guard against multiple closes.
if( closed.compareAndSet(false, true) ) {
worker.addIoTask(new Runnable() {
public void run() {
try {
key.cancel();
} catch (CancelledKeyException e) {
}
worker.release();
}
});
}
}

View File

@ -21,10 +21,8 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SelectorWorker implements Runnable {
@ -33,55 +31,71 @@ public class SelectorWorker implements Runnable {
final SelectorManager manager;
final Selector selector;
final int id = NEXT_ID.getAndIncrement();
final AtomicInteger useCounter = new AtomicInteger();
private final int maxChannelsPerWorker;
private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
final AtomicInteger retainCounter = new AtomicInteger(1);
private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
public SelectorWorker(SelectorManager manager) throws IOException {
this.manager = manager;
selector = Selector.open();
maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
manager.getSelectorExecutor().execute(this);
}
void incrementUseCounter() {
int use = useCounter.getAndIncrement();
if (use == 0) {
manager.getSelectorExecutor().execute(this);
} else if (use + 1 == maxChannelsPerWorker) {
void retain() {
if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
manager.onWorkerFullEvent(this);
}
}
void decrementUseCounter() {
int use = useCounter.getAndDecrement();
if (use == 1) {
void release() {
int use = retainCounter.decrementAndGet();
if (use == 0) {
manager.onWorkerEmptyEvent(this);
} else if (use == maxChannelsPerWorker) {
} else if (use == maxChannelsPerWorker - 1) {
manager.onWorkerNotFullEvent(this);
}
}
boolean isRunning() {
return useCounter.get() != 0;
boolean isReleased() {
return retainCounter.get()==0;
}
public void addIoTask(Runnable work) {
ioTasks.add(work);
selector.wakeup();
}
private void processIoTasks() {
Runnable task;
while( (task= ioTasks.poll()) !=null ) {
try {
task.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
public void run() {
String origName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("Selector Worker: " + id);
while (isRunning()) {
while (!isReleased()) {
processIoTasks();
int count = selector.select(10);
lockBarrier();
int count = selector.select(10);
if (count == 0) {
continue;
}
if (!isRunning()) {
return;
}
// Get a java.util.Set containing the SelectionKey objects
// for all channels that are ready for I/O.
Set keys = selector.selectedKeys();
@ -92,7 +106,9 @@ public class SelectorWorker implements Runnable {
final SelectorSelection s = (SelectorSelection)key.attachment();
try {
s.disable();
if( key.isValid() ) {
key.interestOps(0);
}
// Kick off another thread to find newly selected keys
// while we process the
@ -115,11 +131,8 @@ public class SelectorWorker implements Runnable {
}
}
} catch (IOException e) {
// Don't accept any more slections
manager.onWorkerEmptyEvent(this);
} catch (Throwable e) {
e.printStackTrace();
// Notify all the selections that the error occurred.
Set keys = selector.keys();
for (Iterator i = keys.iterator(); i.hasNext();) {
@ -127,24 +140,15 @@ public class SelectorWorker implements Runnable {
SelectorSelection s = (SelectorSelection)key.attachment();
s.onError(e);
}
} finally {
try {
manager.onWorkerEmptyEvent(this);
selector.close();
} catch (IOException ignore) {
ignore.printStackTrace();
}
Thread.currentThread().setName(origName);
}
}
private void lockBarrier() {
selectorLock.writeLock().lock();
selectorLock.writeLock().unlock();
}
public void lock() {
selectorLock.readLock().lock();
selector.wakeup();
}
public void unlock() {
selectorLock.readLock().unlock();
}
}

View File

@ -25,10 +25,6 @@ import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
public class StompConnection {

View File

@ -16,8 +16,9 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.DataInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
@ -30,11 +31,12 @@ import javax.net.SocketFactory;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOBufferedInputStream;
import org.apache.activemq.transport.nio.NIOOutputStream;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
@ -49,6 +51,10 @@ public class StompNIOTransport extends TcpTransport {
private SocketChannel channel;
private SelectorSelection selection;
private ByteBuffer inputBuffer;
ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
int previousByte = -1;
public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
@ -76,17 +82,52 @@ public class StompNIOTransport extends TcpTransport {
}
});
inputBuffer = ByteBuffer.allocate(8 * 1024);
this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
}
private void serviceRead() {
try {
DataInputStream in = new DataInputStream(new NIOBufferedInputStream(channel, 8 * 1024));
while (true) {
Object command = wireFormat.unmarshal(in);
doConsume((Command)command);
}
while (true) {
// read channel
int readSize = channel.read(inputBuffer);
// channel is closed, cleanup
if (readSize == -1) {
onException(new EOFException());
selection.close();
break;
}
// nothing more to read, break
if (readSize == 0) {
break;
}
inputBuffer.flip();
int b;
ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
int i = 0;
while(i++ < readSize) {
b = input.read();
// skip repeating nulls
if (previousByte == 0 && b == 0) {
continue;
}
currentCommand.write(b);
// end of command reached, unmarshal
if (b == 0) {
Object command = wireFormat.unmarshal(new ByteSequence(currentCommand.toByteArray()));
doConsume((Command)command);
currentCommand.reset();
}
previousByte = b;
}
// clear the buffer
inputBuffer.clear();
}
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
@ -101,7 +142,11 @@ public class StompNIOTransport extends TcpTransport {
}
protected void doStop(ServiceStopper stopper) throws Exception {
selection.disable();
try {
selection.close();
} catch (Exception e) {
e.printStackTrace();
}
super.doStop(stopper);
}
}

View File

@ -0,0 +1,132 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Use Stomp protocol
For better behavior under heavy usage, be sure to:
1. Give broker enough memory
2. Disable dedicated task runner
e.g. ACTIVEMQ_OPTS="-Xmx1024M -Dorg.apache.activemq.UseDedicatedTaskRunner=false"
To run ActiveMQ with this configuration add xbean:conf/activemq-stomp.xml to your command
e.g. $ bin/activemq xbean:conf/activemq-stomp.xml
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data">
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<!--
For better performances use VM cursor and small memory limit.
For more information, see:
http://activemq.apache.org/message-cursors.html
Also, if your producer is "hanging", it's probably due to producer flow control.
For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="false">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61612?transport.closeAsync=false"/>
<transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false"/>
</transportConnectors>
</broker>
<!--
Uncomment to enable Camel
Take a look at activemq-camel.xml for more details
<import resource="camel.xml"/>
-->
<!--
Enable web consoles, REST and Ajax APIs and demos
Take a look at activemq-jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>