mirror of https://github.com/apache/activemq.git
resolve: https://issues.apache.org/activemq/browse/AMQ-2277 - patch applied with thanks
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@781312 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c43eda363b
commit
63baaf038a
|
@ -35,8 +35,16 @@ public final class SelectorSelection {
|
|||
public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
|
||||
this.worker = worker;
|
||||
this.listener = listener;
|
||||
this.key = socketChannel.register(worker.selector, 0, this);
|
||||
worker.incrementUseCounter();
|
||||
|
||||
// Lock when mutating state of the selector
|
||||
worker.lock();
|
||||
|
||||
try {
|
||||
this.key = socketChannel.register(worker.selector, 0, this);
|
||||
worker.incrementUseCounter();
|
||||
} finally {
|
||||
worker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void setInterestOps(int ops) {
|
||||
|
@ -56,8 +64,14 @@ public final class SelectorSelection {
|
|||
|
||||
public void close() {
|
||||
worker.decrementUseCounter();
|
||||
key.cancel();
|
||||
worker.selector.wakeup();
|
||||
|
||||
// Lock when mutating state of the selector
|
||||
worker.lock();
|
||||
try {
|
||||
key.cancel();
|
||||
} finally {
|
||||
worker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void onSelect() {
|
||||
|
|
|
@ -21,7 +21,10 @@ import java.nio.channels.SelectionKey;
|
|||
import java.nio.channels.Selector;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class SelectorWorker implements Runnable {
|
||||
|
||||
|
@ -32,7 +35,8 @@ public class SelectorWorker implements Runnable {
|
|||
final int id = NEXT_ID.getAndIncrement();
|
||||
final AtomicInteger useCounter = new AtomicInteger();
|
||||
private final int maxChannelsPerWorker;
|
||||
|
||||
private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
|
||||
|
||||
public SelectorWorker(SelectorManager manager) throws IOException {
|
||||
this.manager = manager;
|
||||
selector = Selector.open();
|
||||
|
@ -67,7 +71,8 @@ public class SelectorWorker implements Runnable {
|
|||
try {
|
||||
Thread.currentThread().setName("Selector Worker: " + id);
|
||||
while (isRunning()) {
|
||||
|
||||
|
||||
lockBarrier();
|
||||
int count = selector.select(10);
|
||||
if (count == 0) {
|
||||
continue;
|
||||
|
@ -127,4 +132,19 @@ public class SelectorWorker implements Runnable {
|
|||
Thread.currentThread().setName(origName);
|
||||
}
|
||||
}
|
||||
|
||||
private void lockBarrier() {
|
||||
selectorLock.writeLock().lock();
|
||||
selectorLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
public void lock() {
|
||||
selectorLock.readLock().lock();
|
||||
selector.wakeup();
|
||||
}
|
||||
|
||||
public void unlock() {
|
||||
selectorLock.readLock().unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,25 +18,50 @@ package org.apache.activemq.broker;
|
|||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.Collections;
|
||||
|
||||
|
||||
public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
|
||||
@SuppressWarnings("unchecked")
|
||||
public class NioQueueSubscriptionTest extends QueueSubscriptionTest implements ExceptionListener {
|
||||
|
||||
protected static final Log LOG = LogFactory.getLog(NioQueueSubscriptionTest.class);
|
||||
|
||||
private Map<Thread, Throwable> exceptions = Collections.synchronizedMap(new HashMap<Thread, Throwable>());
|
||||
|
||||
@Override
|
||||
protected ConnectionFactory createConnectionFactory() throws Exception {
|
||||
return new ActiveMQConnectionFactory("tcp://localhost:62621");
|
||||
return new ActiveMQConnectionFactory("tcp://localhost:62621?trace=false");
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
//setMaxTestTime(20*60*1000);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = BrokerFactory.createBroker(new URI("broker://nio://localhost:62621?persistent=false&useJmx=true"));
|
||||
BrokerService answer = BrokerFactory.createBroker(new URI("broker://nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0"));
|
||||
answer.getManagementContext().setCreateConnector(false);
|
||||
answer.setUseJmx(false);
|
||||
answer.setDeleteAllMessagesOnStartup(true);
|
||||
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
|
||||
final PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue(">");
|
||||
|
@ -48,4 +73,40 @@ public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
|
|||
answer.setDestinationPolicy(policyMap);
|
||||
return answer;
|
||||
}
|
||||
|
||||
public void testLotsOfConcurrentConnections() throws Exception {
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
final ConnectionFactory factory = createConnectionFactory();
|
||||
final ExceptionListener listener = this;
|
||||
int connectionCount = 400;
|
||||
for (int i=0;i<connectionCount ;i++) {
|
||||
executor.execute(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
|
||||
connection.setExceptionListener(listener);
|
||||
connection.start();
|
||||
assertNotNull(connection.getBrokerName());
|
||||
connections.add(connection);
|
||||
} catch (Exception e) {
|
||||
exceptions.put(Thread.currentThread(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(30, TimeUnit.SECONDS);
|
||||
|
||||
if (!exceptions.isEmpty()) {
|
||||
LOG.error("" + exceptions.size() + " exceptions like", exceptions.values().iterator().next());
|
||||
fail("unexpected exceptions in worker threads: " + exceptions.values().iterator().next());
|
||||
}
|
||||
LOG.info("created " + connectionCount + " connecitons");
|
||||
}
|
||||
|
||||
public void onException(JMSException exception) {
|
||||
LOG.error("Exception on conneciton", exception);
|
||||
exceptions.put(Thread.currentThread(), exception);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue