Ensure that executor threads are created as daemon threads, fix a
try/finaly block, clean up some warnings.
This commit is contained in:
Timothy Bish 2016-01-14 16:47:23 -05:00
parent ebcc1b4eae
commit 5adbafef3b
3 changed files with 29 additions and 32 deletions

View File

@ -50,8 +50,8 @@ public final class SelectorManager {
@Override
public Thread newThread(Runnable runnable) {
this.i++;
final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i);
Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + (i++));
t.setDaemon(false);
return t;
}
});

View File

@ -86,8 +86,9 @@ public final class SelectorSelection {
try {
key.cancel();
} catch (CancelledKeyException e) {
} finally {
worker.release();
}
worker.release();
}
});
}

View File

@ -35,7 +35,7 @@ public class SelectorWorker implements Runnable {
final AtomicInteger retainCounter = new AtomicInteger(1);
private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
public SelectorWorker(SelectorManager manager) throws IOException {
this.manager = manager;
selector = Selector.open();
@ -57,20 +57,19 @@ public class SelectorWorker implements Runnable {
manager.onWorkerNotFullEvent(this);
}
}
boolean isReleased() {
return retainCounter.get()==0;
}
boolean isReleased() {
return retainCounter.get() == 0;
}
public void addIoTask(Runnable work) {
ioTasks.add(work);
selector.wakeup();
}
private void processIoTasks() {
Runnable task;
while( (task= ioTasks.poll()) !=null ) {
Runnable task;
while ((task = ioTasks.poll()) != null) {
try {
task.run();
} catch (Throwable e) {
@ -79,34 +78,33 @@ public class SelectorWorker implements Runnable {
}
}
@Override
public void run() {
String origName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("Selector Worker: " + id);
while (!isReleased()) {
processIoTasks();
int count = selector.select(10);
processIoTasks();
int count = selector.select(10);
if (count == 0) {
continue;
}
// Get a java.util.Set containing the SelectionKey objects
// for all channels that are ready for I/O.
Set keys = selector.selectedKeys();
Set<SelectionKey> keys = selector.selectedKeys();
for (Iterator i = keys.iterator(); i.hasNext();) {
final SelectionKey key = (SelectionKey)i.next();
for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) {
final SelectionKey key = i.next();
i.remove();
final SelectorSelection s = (SelectorSelection)key.attachment();
final SelectorSelection s = (SelectorSelection) key.attachment();
try {
if( key.isValid() ) {
if (key.isValid()) {
key.interestOps(0);
}
@ -114,6 +112,7 @@ public class SelectorWorker implements Runnable {
// while we process the
// currently selected keys
manager.getChannelExecutor().execute(new Runnable() {
@Override
public void run() {
try {
s.onSelect();
@ -127,17 +126,15 @@ public class SelectorWorker implements Runnable {
} catch (Throwable e) {
s.onError(e);
}
}
}
} catch (Throwable e) {
} catch (Throwable e) {
e.printStackTrace();
// Notify all the selections that the error occurred.
Set keys = selector.keys();
for (Iterator i = keys.iterator(); i.hasNext();) {
SelectionKey key = (SelectionKey)i.next();
SelectorSelection s = (SelectorSelection)key.attachment();
Set<SelectionKey> keys = selector.keys();
for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) {
SelectionKey key = i.next();
SelectorSelection s = (SelectorSelection) key.attachment();
s.onError(e);
}
} finally {
@ -145,10 +142,9 @@ public class SelectorWorker implements Runnable {
manager.onWorkerEmptyEvent(this);
selector.close();
} catch (IOException ignore) {
ignore.printStackTrace();
ignore.printStackTrace();
}
Thread.currentThread().setName(origName);
}
}
}