This commit is contained in:
Clebert Suconic 2020-05-11 14:56:24 -04:00
commit b2041de93b
2 changed files with 98 additions and 21 deletions

View File

@ -240,9 +240,9 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
synchronized (this) {
matchingDiverts.entrySet().forEach(entry -> {
if (entry.getKey().getDivert().getForwardAddress().equals(queue.getAddress())) {
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress());
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(entry.getKey().getAddress());
//check if the queue has been tracked by this divert and if so remove the consumer
if (entry.getValue().remove(queue)) {
if (entry.getValue().remove(queue.getAddress())) {
removeRemoteConsumer(getKey(addressInfo));
}
}

View File

@ -186,30 +186,66 @@ public class FederatedAddressTest extends FederatedTestBase {
*/
//Test creating address first followed by divert
//Test creating divert before consumer
//Test destroy divert at end
@Test
public void testDownstreamFederatedAddressDivertAddressAndDivertFirst() throws Exception {
testFederatedAddressDivert(true,true, true);
public void testDownstreamDivertAddressFirstAndDivertFirstDestroyDivert() throws Exception {
testFederatedAddressDivert(true,true, true, true);
}
//Test creating address first followed by divert
//Test creating divert before consumer
//Test destroy queue at end
@Test
public void testDownstreamDivertAddressFirstAndDivertFirstDestroyQueue() throws Exception {
testFederatedAddressDivert(true,true, true, false);
}
//Test creating divert first followed by address
//Test creating divert before consumer
//Test destroy divert at end
@Test
public void testDownstreamFederatedAddressDivertAddressSecondDivertFirst() throws Exception {
testFederatedAddressDivert(true,false, true);
public void testDownstreamDivertAddressSecondDivertFirstDestroyDivert() throws Exception {
testFederatedAddressDivert(true,false, true, true);
}
//Test creating divert first followed by address
//Test creating divert before consumer
//Test destroy divert at end
@Test
public void testDownstreamDivertAddressSecondDivertFirstDestroyQueue() throws Exception {
testFederatedAddressDivert(true,false, true, false);
}
//Test creating address first followed by divert
//Test creating consumer before divert
//Test destroy divert at end
@Test
public void testDownstreamFederatedAddressDivertAddressFirstDivertSecond() throws Exception {
testFederatedAddressDivert(true,true, false);
public void testDownstreamDivertAddressFirstDivertSecondDestroyDivert() throws Exception {
testFederatedAddressDivert(true,true, false, true);
}
//Test creating address first followed by divert
//Test creating consumer before divert
//Test destroy queue at end
@Test
public void testDownstreamDivertAddressFirstDivertSecondDestroyQueue() throws Exception {
testFederatedAddressDivert(true,true, false, false);
}
//Test creating divert first followed by address
//Test creating consumer before divert
//Test destroy divert at end
@Test
public void testDownstreamFederatedAddressDivertAddressAndDivertSecond() throws Exception {
testFederatedAddressDivert(true,false, false);
public void testDownstreamDivertAddressAndDivertSecondDestroyDivert() throws Exception {
testFederatedAddressDivert(true,false, false, true);
}
//Test creating divert first followed by address
//Test creating consumer before divert
//Test destroy queue at end
@Test
public void testDownstreamDivertAddressAndDivertSecondDestroyQueue() throws Exception {
testFederatedAddressDivert(true,false, false, false);
}
/**
@ -217,33 +253,70 @@ public class FederatedAddressTest extends FederatedTestBase {
*/
//Test creating address first followed by divert
//Test creating divert before consumer
//Test destroy divert at end
@Test
public void testUpstreamFederatedAddressDivertAddressAndDivertFirst() throws Exception {
testFederatedAddressDivert(false,true, true);
public void testUpstreamDivertAddressAndDivertFirstDestroyDivert() throws Exception {
testFederatedAddressDivert(false,true, true, true);
}
//Test creating address first followed by divert
//Test creating divert before consumer
//Test destroy queue at end
@Test
public void testUpstreamDivertAddressAndDivertFirstDestroyQueue() throws Exception {
testFederatedAddressDivert(false,true, true, false);
}
//Test creating divert first followed by address
//Test creating divert before consumer
//Test destroy divert at end
@Test
public void testUpstreamFederatedAddressDivertAddressSecondDivertFirst() throws Exception {
testFederatedAddressDivert(false,false, true);
public void testUpstreamDivertAddressSecondDivertFirstDestroyDivert() throws Exception {
testFederatedAddressDivert(false,false, true, true);
}
//Test creating divert first followed by address
//Test creating divert before consumer
//Test destroy queue at end
@Test
public void testUpstreamDivertAddressSecondDivertFirstDestroyQueue() throws Exception {
testFederatedAddressDivert(false,false, true, false);
}
//Test creating address first followed by divert
//Test creating consumer before divert
//Test destroy divert at end
@Test
public void testUpstreamFederatedAddressDivertAddressFirstDivertSecond() throws Exception {
testFederatedAddressDivert(false,true, false);
public void testUpstreamDivertAddressFirstDivertSecondDestroyDivert() throws Exception {
testFederatedAddressDivert(false,true, false, true);
}
//Test creating address first followed by divert
//Test creating consumer before divert
//Test destroy queue at end
@Test
public void testUpstreamDivertAddressFirstDivertSecondDestroyQueue() throws Exception {
testFederatedAddressDivert(false,true, false, false);
}
//Test creating divert first followed by address
//Test creating consumer before divert
//Test destroy divert at end
@Test
public void testUpstreamFederatedAddressDivertAddressAndDivertSecond() throws Exception {
testFederatedAddressDivert(false,false, false);
public void testUpstreamsDivertAddressAndDivertSecondDestroyDivert() throws Exception {
testFederatedAddressDivert(false,false, false, true);
}
protected void testFederatedAddressDivert(boolean downstream, boolean addressFirst, boolean divertBeforeConsumer) throws Exception {
//Test creating divert first followed by address
//Test creating consumer before divert
//Test destroy queue at end
@Test
public void testUpstreamDivertAddressAndDivertSecondDestroyQueue() throws Exception {
testFederatedAddressDivert(false,false, false, false);
}
protected void testFederatedAddressDivert(boolean downstream, boolean addressFirst, boolean divertBeforeConsumer,
boolean destroyDivert) throws Exception {
String address = getName();
String address2 = "fedOneWayDivertTest";
@ -308,8 +381,12 @@ public class FederatedAddressTest extends FederatedTestBase {
assertNotNull(consumer0.receive(1000));
//Test consumer is cleaned up after divert destroyed
getServer(0).destroyDivert(SimpleString.toSimpleString(address + ":" + address2));
// getServer(0).destroyQueue(SimpleString.toSimpleString(address2));
if (destroyDivert) {
getServer(0).destroyDivert(SimpleString.toSimpleString(address + ":" + address2));
//Test consumer is cleaned up after queue destroyed
} else {
getServer(0).destroyQueue(SimpleString.toSimpleString(address2), null, false);
}
assertTrue(Wait.waitFor(() -> remoteQueueBinding.getQueue().getConsumerCount() == 0, 2000, 100));
}
}