mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2440 - fixing bug on marking worker as non empty
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@920827 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
54fa83da8b
commit
3cbe3f1f92
|
@ -99,7 +99,7 @@ public final class SelectorManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
|
public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
|
||||||
freeWorkers.add(worker);
|
freeWorkers.addFirst(worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Executor getChannelExecutor() {
|
public Executor getChannelExecutor() {
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class SelectorWorker implements Runnable {
|
||||||
int use = retainCounter.decrementAndGet();
|
int use = retainCounter.decrementAndGet();
|
||||||
if (use == 0) {
|
if (use == 0) {
|
||||||
manager.onWorkerEmptyEvent(this);
|
manager.onWorkerEmptyEvent(this);
|
||||||
} else if (use < maxChannelsPerWorker) {
|
} else if (use == maxChannelsPerWorker - 1) {
|
||||||
manager.onWorkerNotFullEvent(this);
|
manager.onWorkerNotFullEvent(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,18 +68,15 @@ public class SelectorWorker implements Runnable {
|
||||||
selector.wakeup();
|
selector.wakeup();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean processIoTasks() {
|
private void processIoTasks() {
|
||||||
boolean rc = false;
|
|
||||||
Runnable task;
|
Runnable task;
|
||||||
while( (task= ioTasks.poll()) !=null ) {
|
while( (task= ioTasks.poll()) !=null ) {
|
||||||
try {
|
try {
|
||||||
rc = true;
|
|
||||||
task.run();
|
task.run();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rc;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -90,11 +87,11 @@ public class SelectorWorker implements Runnable {
|
||||||
try {
|
try {
|
||||||
Thread.currentThread().setName("Selector Worker: " + id);
|
Thread.currentThread().setName("Selector Worker: " + id);
|
||||||
while (!isReleased()) {
|
while (!isReleased()) {
|
||||||
|
|
||||||
if( processIoTasks() ) {
|
processIoTasks();
|
||||||
continue;
|
|
||||||
}
|
int count = selector.select(10);
|
||||||
int count = selector.select(10);
|
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -134,8 +131,8 @@ public class SelectorWorker implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
} catch (Throwable e) {
|
e.printStackTrace();
|
||||||
// Notify all the selections that the error occurred.
|
// Notify all the selections that the error occurred.
|
||||||
Set keys = selector.keys();
|
Set keys = selector.keys();
|
||||||
for (Iterator i = keys.iterator(); i.hasNext();) {
|
for (Iterator i = keys.iterator(); i.hasNext();) {
|
||||||
|
@ -143,12 +140,12 @@ public class SelectorWorker implements Runnable {
|
||||||
SelectorSelection s = (SelectorSelection)key.attachment();
|
SelectorSelection s = (SelectorSelection)key.attachment();
|
||||||
s.onError(e);
|
s.onError(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
manager.onWorkerEmptyEvent(this);
|
manager.onWorkerEmptyEvent(this);
|
||||||
selector.close();
|
selector.close();
|
||||||
} catch (IOException ignore) {
|
} catch (IOException ignore) {
|
||||||
|
ignore.printStackTrace();
|
||||||
}
|
}
|
||||||
Thread.currentThread().setName(origName);
|
Thread.currentThread().setName(origName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,6 +145,7 @@ public class StompNIOTransport extends TcpTransport {
|
||||||
try {
|
try {
|
||||||
selection.close();
|
selection.close();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
super.doStop(stopper);
|
super.doStop(stopper);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue