Improve Broker.removeConnection() method, so that we pass the cause for removing it (if available)
This commit is contained in:
Dejan Bosanac 2015-05-12 14:22:15 +02:00
parent b679c8d4d2
commit e19293de5f
3 changed files with 25 additions and 8 deletions

View File

@ -117,7 +117,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
protected BrokerInfo brokerInfo;
protected final List<Command> dispatchQueue = new LinkedList<Command>();
protected TaskRunner taskRunner;
protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
@ -152,7 +152,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
private Throwable stopError = null;
/**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
@ -245,7 +244,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
TRANSPORTLOG.warn(this + " failed: " + e);
}
stopAsync();
stopAsync(e);
}
}
@ -295,7 +294,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
ce.setException(e);
dispatchSync(ce);
// Record the error that caused the transport to stop
this.stopError = e;
transportException.set(e);
// Wait a little bit to try to get the output buffer to flush
// the exception notification to the client.
try {
@ -334,7 +333,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
if (!pendingStop) {
response = command.visit(this);
} else {
response = new ExceptionResponse(this.stopError);
response = new ExceptionResponse(transportException.get());
}
} catch (Throwable e) {
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
@ -863,7 +862,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
iter.remove();
}
try {
broker.removeConnection(cs.getContext(), cs.getInfo(), null);
broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
} catch (Throwable e) {
SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
}
@ -1073,7 +1072,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
if (waitTime > 0) {
synchronized (this) {
pendingStop = true;
stopError = cause;
transportException.set(cause);
}
try {
stopTaskRunnerFactory.execute(new Runnable() {
@ -1093,6 +1092,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
public void stopAsync(Throwable cause) {
transportException.set(cause);
stopAsync();
}
public void stopAsync() {
// If we're in the middle of starting then go no further... for now.
synchronized (this) {

View File

@ -266,7 +266,7 @@ public class RegionBroker extends EmptyBroker {
LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
if (connection instanceof TransportConnection) {
TransportConnection transportConnection = (TransportConnection) connection;
transportConnection.stopAsync();
transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId()));
} else {
connection.stop();
}

View File

@ -18,20 +18,32 @@ package org.apache.activemq.broker;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ConnectionInfo;
import javax.jms.Connection;
import javax.jms.InvalidClientIDException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class LinkStealingTest extends TestCase {
protected BrokerService brokerService;
protected int timeOutInSeconds = 10;
protected final AtomicReference<Throwable> removeException = new AtomicReference<Throwable>();
@Override
protected void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
removeException.set(error);
super.removeConnection(context, info, error);
}
}
});
}
@Override
@ -86,6 +98,7 @@ public class LinkStealingTest extends TestCase {
exceptionFlag.set(true);
}
assertFalse(exceptionFlag.get());
assertNotNull(removeException.get());
}
}