mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2440 - more stomp+nip (and just nio) transport improvements
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@898774 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a593e35d82
commit
db6827346b
|
@ -79,11 +79,6 @@ public final class SelectorManager {
|
|||
|
||||
public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
|
||||
freeWorkers.remove(worker);
|
||||
try {
|
||||
// no more connections on worker, close it
|
||||
worker.close();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
@ -64,11 +65,15 @@ public final class SelectorSelection {
|
|||
|
||||
public void close() {
|
||||
worker.decrementUseCounter();
|
||||
|
||||
|
||||
// Lock when mutating state of the selector
|
||||
worker.lock();
|
||||
try {
|
||||
key.cancel();
|
||||
if (!worker.isRunning()) {
|
||||
worker.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
} finally {
|
||||
worker.unlock();
|
||||
}
|
||||
|
|
|
@ -17,11 +17,11 @@
|
|||
package org.apache.activemq.transport.nio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedSelectorException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -72,7 +72,8 @@ public class SelectorWorker implements Runnable {
|
|||
Thread.currentThread().setName("Selector Worker: " + id);
|
||||
while (isRunning()) {
|
||||
|
||||
lockBarrier();
|
||||
lockBarrier();
|
||||
|
||||
int count = selector.select(10);
|
||||
if (count == 0) {
|
||||
continue;
|
||||
|
@ -115,8 +116,10 @@ public class SelectorWorker implements Runnable {
|
|||
}
|
||||
|
||||
}
|
||||
} catch (ClosedSelectorException cse) {
|
||||
// Don't accept any more selections
|
||||
manager.onWorkerEmptyEvent(this);
|
||||
} catch (IOException e) {
|
||||
|
||||
// Don't accept any more selections
|
||||
manager.onWorkerEmptyEvent(this);
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
@ -32,7 +31,6 @@ import javax.net.SocketFactory;
|
|||
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.nio.NIOInputStream;
|
||||
import org.apache.activemq.transport.nio.NIOOutputStream;
|
||||
import org.apache.activemq.transport.nio.SelectorManager;
|
||||
import org.apache.activemq.transport.nio.SelectorSelection;
|
||||
|
@ -131,8 +129,7 @@ public class StompNIOTransport extends TcpTransport {
|
|||
|
||||
}
|
||||
} catch (IOException e) {
|
||||
selection.close();
|
||||
onException(e);
|
||||
onException(e);
|
||||
} catch (Throwable e) {
|
||||
onException(IOExceptionSupport.create(e));
|
||||
}
|
||||
|
@ -145,7 +142,10 @@ public class StompNIOTransport extends TcpTransport {
|
|||
}
|
||||
|
||||
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||
selection.disable();
|
||||
try {
|
||||
selection.close();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
super.doStop(stopper);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue