Commits: 'Add redistribution to old Anycast consumers' and 'Changed redistributionDelay-check to earlier, improved test, reverted changes in ClusterTestBase'

This commit is contained in:
AntonRoskvist 2021-12-05 20:21:50 +01:00 committed by clebertsuconic
parent f4de5c46c0
commit 7d129b36e9
2 changed files with 70 additions and 0 deletions

View File

@ -315,6 +315,39 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
queueInfos.put(clusterName, info);
if (distance < 1) {
//Binding added locally. If a matching remote binding with consumers exist, add a redistributor
Binding binding = getBinding(routingName);
if (binding != null) {
Queue queue = (Queue) binding.getBindable();
AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress().toString());
long redistributionDelay = addressSettings.getRedistributionDelay();
if (redistributionDelay == -1) {
//No need to keep looking since redistribution is not enabled
break;
}
try {
Bindings bindings = getBindingsForAddress(address);
for (Binding bind : bindings.getBindings()) {
if (bind.isConnected() && bind instanceof RemoteQueueBinding) {
RemoteQueueBinding remoteBinding = (RemoteQueueBinding) bind;
if (remoteBinding.consumerCount() > 0) {
queue.addRedistributor(redistributionDelay);
break;
}
}
}
} catch (Exception ignore) {
}
}
}
break;
}
case BINDING_REMOVED: {

View File

@ -794,6 +794,43 @@ public class MessageRedistributionTest extends ClusterTestBase {
verifyReceiveAll(2, 1);
}
@Test
public void testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistribution() throws Exception {
String address = "test.address";
String queue = "test.address";
String clusterAddress = "test";
AddressSettings settings = new AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
RoutingType routingType = RoutingType.ANYCAST;
getServer(0).getAddressSettingsRepository().addMatch(address, settings);
getServer(1).getAddressSettingsRepository().addMatch(address, settings);
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, address, queue, null, true, routingType);
addConsumer(0, 0, queue, null);
waitForBindings(0, address, 1, 1, true);
waitForBindings(1, address, 1, 1, false);
createQueue(1, address, queue, null, true, routingType);
waitForBindings(1, address, 1, 0, true);
waitForBindings(0, address, 1, 0, false);
waitForBindings(1, address, 1, 1, false);
final int noMessages = 10;
send(1, address, noMessages, true, null, null);
verifyReceiveAll(noMessages, 0);
}
@Test
public void testBackAndForth() throws Exception {
for (int i = 0; i < 10; i++) {