This closes #144

This commit is contained in:
Clebert Suconic 2015-08-28 19:01:46 -04:00
commit aecea5142d
1 changed files with 1 additions and 61 deletions

View File

@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -94,7 +93,6 @@ import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
@ -124,29 +122,16 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
private OpenWireFormat wireFormat;
private boolean faultTolerantConnection;
private AMQConnectionContext context;
private boolean manageable;
private boolean pendingStop;
private Throwable stopError = null;
// should come from activemq server
private final TaskRunnerFactory stopTaskRunnerFactory = null;
private boolean starting;
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private final CountDownLatch stopped = new CountDownLatch(1);
private boolean active;
protected final List<Command> dispatchQueue = new LinkedList<Command>();
private boolean inServiceException;
@ -547,7 +532,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
state.reset(info);
this.faultTolerantConnection = info.isFaultTolerant();
// Setup the context.
String clientId = info.getClientId();
context.setBroker(protocolManager);
@ -558,11 +542,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
// for now we pass the manager as the connector and see what happens
// it should be related to activemq's Acceptor
context.setConnector(this.acceptorUsed);
context.setFaultTolerant(faultTolerantConnection);
context.setFaultTolerant(info.isFaultTolerant());
context.setUserName(info.getUserName());
context.setWireFormatInfo(wireFormatInfo);
context.setReconnect(info.isFailoverReconnect());
this.manageable = info.isManageable();
context.setConnectionState(state);
if (info.getClientIp() == null) {
info.setClientIp(getRemoteAddress());
@ -705,22 +688,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
pendingStop = true;
stopError = cause;
}
try {
stopTaskRunnerFactory.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(waitTime);
stopAsync();
}
catch (InterruptedException e) {
}
}
});
}
catch (Throwable t) {
// log error
}
}
}
@ -728,37 +695,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
// If we're in the middle of starting then go no further... for now.
synchronized (this) {
pendingStop = true;
if (starting) {
// log
return;
}
}
if (stopping.compareAndSet(false, true)) {
if (context != null) {
context.getStopping().set(true);
}
try {
stopTaskRunnerFactory.execute(new Runnable() {
@Override
public void run() {
serviceLock.writeLock().lock();
try {
doStop();
}
catch (Throwable e) {
// LOG
}
finally {
stopped.countDown();
serviceLock.writeLock().unlock();
}
}
});
}
catch (Throwable t) {
// LOG
stopped.countDown();
}
}
}
@ -777,7 +718,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
// log
}
active = false;
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
synchronized (dispatchQueue) {