Apply fix and add test for:

https://issues.apache.org/jira/browse/AMQ-5385
This commit is contained in:
Timothy Bish 2014-10-09 12:40:13 -04:00
parent 97c127d2d4
commit 62c20ebdcf
2 changed files with 56 additions and 15 deletions

View File

@ -245,23 +245,23 @@ public class RegionBroker extends EmptyBroker {
synchronized (clientIdSet) {
ConnectionContext oldContext = clientIdSet.get(clientId);
if (oldContext != null) {
if (context.isAllowLinkStealing()){
clientIdSet.remove(clientId);
if (oldContext.getConnection() != null) {
Connection connection = oldContext.getConnection();
LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
if (connection instanceof TransportConnection){
if (context.isAllowLinkStealing()) {
clientIdSet.put(clientId, context);
if (oldContext.getConnection() != null) {
Connection connection = oldContext.getConnection();
LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
if (connection instanceof TransportConnection) {
TransportConnection transportConnection = (TransportConnection) connection;
transportConnection.stopAsync();
}else{
connection.stop();
}
}else{
LOG.error("Not Connection for {}", oldContext);
}
}else{
transportConnection.stopAsync();
} else {
connection.stop();
}
} else {
LOG.error("No Connection found for {}", oldContext);
}
} else {
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
+ oldContext.getConnection().getRemoteAddress());
+ oldContext.getConnection().getRemoteAddress());
}
} else {
clientIdSet.put(clientId, context);

View File

@ -34,6 +34,7 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
@ -1227,6 +1228,46 @@ public class MQTTTest extends MQTTTestSupport {
connection2.disconnect();
}
@Test(timeout = 60 * 1000)
public void testRepeatedLinkStealing() throws Exception {
final String clientId = "duplicateClient";
final AtomicReference<BlockingConnection> oldConnection = new AtomicReference<BlockingConnection>();
final String TOPICA = "TopicA";
for (int i = 1; i <= 10; ++i) {
LOG.info("Creating MQTT Connection {}", i);
MQTT mqtt = createMQTTConnection(clientId, false);
mqtt.setKeepAlive((short) 2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
assertTrue("Client connect failed for attempt: " + i, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connection.isConnected();
}
}));
if (oldConnection.get() != null) {
assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return !oldConnection.get().isConnected();
}
}));
}
oldConnection.set(connection);
}
oldConnection.get().publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
oldConnection.get().disconnect();
}
@Test(timeout = 30 * 10000)
public void testJmsMapping() throws Exception {
doTestJmsMapping("test.foo");