Better selector synchonization to help resolve AMQ-2440

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@916762 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2010-02-26 17:14:40 +00:00
parent eac31e8417
commit 662324caac
4 changed files with 104 additions and 91 deletions

View File

@ -61,15 +61,25 @@ public final class SelectorManager {
public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
throws IOException { throws IOException {
SelectorWorker worker = null; SelectorSelection selection = null;
if (freeWorkers.size() > 0) { while( selection == null ) {
worker = freeWorkers.getFirst(); if (freeWorkers.size() > 0) {
} else { SelectorWorker worker = freeWorkers.getFirst();
worker = new SelectorWorker(this); if( worker.isReleased() ) {
freeWorkers.addFirst(worker); freeWorkers.remove(worker);
} else {
worker.retain();
selection = new SelectorSelection(worker, socketChannel, listener);
}
} else {
// Worker starts /w retain count of 1
SelectorWorker worker = new SelectorWorker(this);
freeWorkers.addFirst(worker);
selection = new SelectorSelection(worker, socketChannel, listener);
}
} }
SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener);
return selection; return selection;
} }

View File

@ -16,10 +16,11 @@
*/ */
package org.apache.activemq.transport.nio; package org.apache.activemq.transport.nio;
import java.io.IOException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.nio.SelectorManager.Listener; import org.apache.activemq.transport.nio.SelectorManager.Listener;
@ -29,23 +30,23 @@ import org.apache.activemq.transport.nio.SelectorManager.Listener;
public final class SelectorSelection { public final class SelectorSelection {
private final SelectorWorker worker; private final SelectorWorker worker;
private final SelectionKey key;
private final Listener listener; private final Listener listener;
private int interest; private int interest;
private SelectionKey key;
private AtomicBoolean closed = new AtomicBoolean();
public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException { public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
this.worker = worker; this.worker = worker;
this.listener = listener; this.listener = listener;
worker.addIoTask(new Runnable() {
// Lock when mutating state of the selector public void run() {
worker.lock(); try {
SelectorSelection.this.key = socketChannel.register(worker.selector, 0, SelectorSelection.this);
try { } catch (Exception e) {
this.key = socketChannel.register(worker.selector, 0, this); e.printStackTrace();
worker.incrementUseCounter(); }
} finally { }
worker.unlock(); });
}
} }
public void setInterestOps(int ops) { public void setInterestOps(int ops) {
@ -53,29 +54,39 @@ public final class SelectorSelection {
} }
public void enable() { public void enable() {
key.interestOps(interest); worker.addIoTask(new Runnable() {
worker.selector.wakeup(); public void run() {
try {
key.interestOps(interest);
} catch (CancelledKeyException e) {
}
}
});
} }
public void disable() { public void disable() {
if (key.isValid()) { worker.addIoTask(new Runnable() {
key.interestOps(0); public void run() {
} try {
key.interestOps(0);
} catch (CancelledKeyException e) {
}
}
});
} }
public void close() { public void close() {
worker.decrementUseCounter(); // guard against multiple closes.
if( closed.compareAndSet(false, true) ) {
// Lock when mutating state of the selector worker.addIoTask(new Runnable() {
worker.lock(); public void run() {
try { try {
key.cancel(); key.cancel();
if (!worker.isRunning()) { } catch (CancelledKeyException e) {
worker.close(); }
} worker.release();
} catch (IOException e) { }
} finally { });
worker.unlock();
} }
} }

View File

@ -17,14 +17,12 @@
package org.apache.activemq.transport.nio; package org.apache.activemq.transport.nio;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SelectorWorker implements Runnable { public class SelectorWorker implements Runnable {
@ -33,56 +31,69 @@ public class SelectorWorker implements Runnable {
final SelectorManager manager; final SelectorManager manager;
final Selector selector; final Selector selector;
final int id = NEXT_ID.getAndIncrement(); final int id = NEXT_ID.getAndIncrement();
final AtomicInteger useCounter = new AtomicInteger();
private final int maxChannelsPerWorker; private final int maxChannelsPerWorker;
private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
final AtomicInteger retainCounter = new AtomicInteger(1);
private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
public SelectorWorker(SelectorManager manager) throws IOException { public SelectorWorker(SelectorManager manager) throws IOException {
this.manager = manager; this.manager = manager;
selector = Selector.open(); selector = Selector.open();
maxChannelsPerWorker = manager.getMaxChannelsPerWorker(); maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
manager.getSelectorExecutor().execute(this);
} }
void incrementUseCounter() { void retain() {
int use = useCounter.getAndIncrement(); if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
if (use == 0) {
manager.getSelectorExecutor().execute(this);
} else if (use + 1 == maxChannelsPerWorker) {
manager.onWorkerFullEvent(this); manager.onWorkerFullEvent(this);
} }
} }
void decrementUseCounter() { void release() {
int use = useCounter.getAndDecrement(); int use = retainCounter.decrementAndGet();
if (use == 1) { if (use == 0) {
manager.onWorkerEmptyEvent(this); manager.onWorkerEmptyEvent(this);
} else if (use == maxChannelsPerWorker) { } else if (use < maxChannelsPerWorker) {
manager.onWorkerNotFullEvent(this); manager.onWorkerNotFullEvent(this);
} }
} }
boolean isRunning() { boolean isReleased() {
return useCounter.get() != 0; return retainCounter.get()==0;
} }
public void addIoTask(Runnable work) {
ioTasks.add(work);
selector.wakeup();
}
private void processIoTasks() {
Runnable task;
while( (task= ioTasks.poll()) !=null ) {
try {
task.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
public void run() { public void run() {
String origName = Thread.currentThread().getName(); String origName = Thread.currentThread().getName();
try { try {
Thread.currentThread().setName("Selector Worker: " + id); Thread.currentThread().setName("Selector Worker: " + id);
while (isRunning()) { while (!isReleased()) {
lockBarrier();
processIoTasks();
int count = selector.select(10); int count = selector.select(10);
if (count == 0) { if (count == 0) {
continue; continue;
} }
if (!isRunning()) {
return;
}
// Get a java.util.Set containing the SelectionKey objects // Get a java.util.Set containing the SelectionKey objects
// for all channels that are ready for I/O. // for all channels that are ready for I/O.
Set keys = selector.selectedKeys(); Set keys = selector.selectedKeys();
@ -93,7 +104,9 @@ public class SelectorWorker implements Runnable {
final SelectorSelection s = (SelectorSelection)key.attachment(); final SelectorSelection s = (SelectorSelection)key.attachment();
try { try {
s.disable(); if( key.isValid() ) {
key.interestOps(0);
}
// Kick off another thread to find newly selected keys // Kick off another thread to find newly selected keys
// while we process the // while we process the
@ -116,13 +129,8 @@ public class SelectorWorker implements Runnable {
} }
} }
} catch (ClosedSelectorException cse) {
// Don't accept any more selections } catch (Throwable e) {
manager.onWorkerEmptyEvent(this);
} catch (IOException e) {
// Don't accept any more selections
manager.onWorkerEmptyEvent(this);
// Notify all the selections that the error occurred. // Notify all the selections that the error occurred.
Set keys = selector.keys(); Set keys = selector.keys();
for (Iterator i = keys.iterator(); i.hasNext();) { for (Iterator i = keys.iterator(); i.hasNext();) {
@ -132,25 +140,13 @@ public class SelectorWorker implements Runnable {
} }
} finally { } finally {
try {
manager.onWorkerEmptyEvent(this);
selector.close();
} catch (IOException ignore) {
}
Thread.currentThread().setName(origName); Thread.currentThread().setName(origName);
} }
} }
private void lockBarrier() {
selectorLock.writeLock().lock();
selectorLock.writeLock().unlock();
}
public void lock() {
selectorLock.readLock().lock();
selector.wakeup();
}
public void unlock() {
selectorLock.readLock().unlock();
}
public void close() throws IOException {
selector.close();
}
} }

View File

@ -25,10 +25,6 @@ import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.HashMap; import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
public class StompConnection { public class StompConnection {