From 662324caac2eb56e67ac65804aaf98232518cbdc Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 26 Feb 2010 17:14:40 +0000 Subject: [PATCH] Better selector synchonization to help resolve AMQ-2440 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@916762 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/nio/SelectorManager.java | 26 +++-- .../transport/nio/SelectorSelection.java | 71 ++++++++------ .../transport/nio/SelectorWorker.java | 94 +++++++++---------- .../transport/stomp/StompConnection.java | 4 - 4 files changed, 104 insertions(+), 91 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java index 40ce1c7310..be5c424bf7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java @@ -61,15 +61,25 @@ public final class SelectorManager { public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener) throws IOException { - SelectorWorker worker = null; - if (freeWorkers.size() > 0) { - worker = freeWorkers.getFirst(); - } else { - worker = new SelectorWorker(this); - freeWorkers.addFirst(worker); + SelectorSelection selection = null; + while( selection == null ) { + if (freeWorkers.size() > 0) { + SelectorWorker worker = freeWorkers.getFirst(); + if( worker.isReleased() ) { + freeWorkers.remove(worker); + } else { + worker.retain(); + selection = new SelectorSelection(worker, socketChannel, listener); + } + + } else { + // Worker starts /w retain count of 1 + SelectorWorker worker = new SelectorWorker(this); + freeWorkers.addFirst(worker); + selection = new SelectorSelection(worker, socketChannel, listener); + } } - - SelectorSelection selection = new SelectorSelection(worker, socketChannel, listener); + return selection; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java index 7ff2e5b169..e96d50d78d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java @@ -16,10 +16,11 @@ */ package org.apache.activemq.transport.nio; -import java.io.IOException; +import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.transport.nio.SelectorManager.Listener; @@ -29,23 +30,23 @@ import org.apache.activemq.transport.nio.SelectorManager.Listener; public final class SelectorSelection { private final SelectorWorker worker; - private final SelectionKey key; private final Listener listener; private int interest; + private SelectionKey key; + private AtomicBoolean closed = new AtomicBoolean(); - public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException { + public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel, Listener listener) throws ClosedChannelException { this.worker = worker; this.listener = listener; - - // Lock when mutating state of the selector - worker.lock(); - - try { - this.key = socketChannel.register(worker.selector, 0, this); - worker.incrementUseCounter(); - } finally { - worker.unlock(); - } + worker.addIoTask(new Runnable() { + public void run() { + try { + SelectorSelection.this.key = socketChannel.register(worker.selector, 0, SelectorSelection.this); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); } public void setInterestOps(int ops) { @@ -53,29 +54,39 @@ public final class SelectorSelection { } public void enable() { - key.interestOps(interest); - worker.selector.wakeup(); + worker.addIoTask(new Runnable() { + public void run() { + try { + key.interestOps(interest); + } catch (CancelledKeyException e) { + } + } + }); } public void disable() { - if (key.isValid()) { - key.interestOps(0); - } + worker.addIoTask(new Runnable() { + public void run() { + try { + key.interestOps(0); + } catch (CancelledKeyException e) { + } + } + }); } public void close() { - worker.decrementUseCounter(); - - // Lock when mutating state of the selector - worker.lock(); - try { - key.cancel(); - if (!worker.isRunning()) { - worker.close(); - } - } catch (IOException e) { - } finally { - worker.unlock(); + // guard against multiple closes. + if( closed.compareAndSet(false, true) ) { + worker.addIoTask(new Runnable() { + public void run() { + try { + key.cancel(); + } catch (CancelledKeyException e) { + } + worker.release(); + } + }); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java index 21cf5ee965..4413b3bc10 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java @@ -17,14 +17,12 @@ package org.apache.activemq.transport.nio; import java.io.IOException; -import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; public class SelectorWorker implements Runnable { @@ -33,56 +31,69 @@ public class SelectorWorker implements Runnable { final SelectorManager manager; final Selector selector; final int id = NEXT_ID.getAndIncrement(); - final AtomicInteger useCounter = new AtomicInteger(); private final int maxChannelsPerWorker; - private final ReadWriteLock selectorLock = new ReentrantReadWriteLock(); + + final AtomicInteger retainCounter = new AtomicInteger(1); + private final ConcurrentLinkedQueue ioTasks = new ConcurrentLinkedQueue(); public SelectorWorker(SelectorManager manager) throws IOException { this.manager = manager; selector = Selector.open(); maxChannelsPerWorker = manager.getMaxChannelsPerWorker(); + manager.getSelectorExecutor().execute(this); } - void incrementUseCounter() { - int use = useCounter.getAndIncrement(); - if (use == 0) { - manager.getSelectorExecutor().execute(this); - } else if (use + 1 == maxChannelsPerWorker) { + void retain() { + if (retainCounter.incrementAndGet() == maxChannelsPerWorker) { manager.onWorkerFullEvent(this); } } - void decrementUseCounter() { - int use = useCounter.getAndDecrement(); - if (use == 1) { + void release() { + int use = retainCounter.decrementAndGet(); + if (use == 0) { manager.onWorkerEmptyEvent(this); - } else if (use == maxChannelsPerWorker) { + } else if (use < maxChannelsPerWorker) { manager.onWorkerNotFullEvent(this); } } - - boolean isRunning() { - return useCounter.get() != 0; + + boolean isReleased() { + return retainCounter.get()==0; } + + public void addIoTask(Runnable work) { + ioTasks.add(work); + selector.wakeup(); + } + + private void processIoTasks() { + Runnable task; + while( (task= ioTasks.poll()) !=null ) { + try { + task.run(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + + + public void run() { String origName = Thread.currentThread().getName(); try { Thread.currentThread().setName("Selector Worker: " + id); - while (isRunning()) { - - lockBarrier(); + while (!isReleased()) { + processIoTasks(); int count = selector.select(10); if (count == 0) { continue; } - if (!isRunning()) { - return; - } - // Get a java.util.Set containing the SelectionKey objects // for all channels that are ready for I/O. Set keys = selector.selectedKeys(); @@ -93,7 +104,9 @@ public class SelectorWorker implements Runnable { final SelectorSelection s = (SelectorSelection)key.attachment(); try { - s.disable(); + if( key.isValid() ) { + key.interestOps(0); + } // Kick off another thread to find newly selected keys // while we process the @@ -116,13 +129,8 @@ public class SelectorWorker implements Runnable { } } - } catch (ClosedSelectorException cse) { - // Don't accept any more selections - manager.onWorkerEmptyEvent(this); - } catch (IOException e) { - // Don't accept any more selections - manager.onWorkerEmptyEvent(this); - + + } catch (Throwable e) { // Notify all the selections that the error occurred. Set keys = selector.keys(); for (Iterator i = keys.iterator(); i.hasNext();) { @@ -132,25 +140,13 @@ public class SelectorWorker implements Runnable { } } finally { + try { + manager.onWorkerEmptyEvent(this); + selector.close(); + } catch (IOException ignore) { + } Thread.currentThread().setName(origName); } } - private void lockBarrier() { - selectorLock.writeLock().lock(); - selectorLock.writeLock().unlock(); - } - - public void lock() { - selectorLock.readLock().lock(); - selector.wakeup(); - } - - public void unlock() { - selectorLock.readLock().unlock(); - } - - public void close() throws IOException { - selector.close(); - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java index db68061eaa..74fd56dc0f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java @@ -25,10 +25,6 @@ import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; import java.util.HashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe; public class StompConnection {