ARTEMIS-4570 filter not applied to all brokers in cluster

This commit is contained in:
Howard Gao 2024-01-15 18:39:14 +08:00 committed by clebertsuconic
parent 4b78cabe4e
commit 07b02159d2
10 changed files with 115 additions and 4 deletions

View File

@ -47,7 +47,8 @@ public enum CoreNotificationType implements NotificationType {
SESSION_CREATED(26),
SESSION_CLOSED(27),
MESSAGE_DELIVERED(28),
MESSAGE_EXPIRED(29);
MESSAGE_EXPIRED(29),
BINDING_UPDATED(30);
private final int value;

View File

@ -801,6 +801,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if ((forceUpdate || newFilter != oldFilter) && !Objects.equals(oldFilter, newFilter)) {
changed = true;
queue.setFilter(newFilter);
notifyBindingUpdatedForQueue(queueBinding);
}
if ((forceUpdate || queueConfiguration.isConfigurationManaged() != null) && !Objects.equals(queueConfiguration.isConfigurationManaged(), queue.isConfigurationManaged())) {
changed = true;
@ -836,6 +837,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
public void notifyBindingUpdatedForQueue(QueueBinding binding) throws Exception {
//only the filter could be updated
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
Filter filter = binding.getFilter();
if (filter != null) {
props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
}
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
String uid = UUIDGenerator.getInstance().generateStringUUID();
logger.debug("ClusterCommunication::Sending notification for updateBinding {} from server {}", binding, server);
managementService.sendNotification(new Notification(uid, CoreNotificationType.BINDING_UPDATED, props));
}
@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
EnumSet<RoutingType> routingTypes) throws Exception {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@ -34,5 +35,7 @@ public interface RemoteQueueBinding extends QueueBinding {
long getRemoteQueueID();
void setFilter(Filter filter);
MessageLoadBalancingType getMessageLoadBalancingType();
}

View File

@ -258,6 +258,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
"', '" +
CoreNotificationType.BINDING_REMOVED +
"', '" +
CoreNotificationType.BINDING_UPDATED +
"', '" +
CoreNotificationType.CONSUMER_CREATED +
"', '" +
CoreNotificationType.CONSUMER_CLOSED +

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyManager;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@ -1134,6 +1135,10 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
break;
}
case BINDING_UPDATED: {
doBindingUpdated(message);
break;
}
case CONSUMER_CREATED: {
doConsumerCreated(message);
@ -1269,6 +1274,22 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
}
}
private synchronized void doBindingUpdated(final ClientMessage message) throws Exception {
logger.trace("{} Update binding {}", ClusterConnectionImpl.this, message);
if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
throw new IllegalStateException("clusterName is null");
}
SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
RemoteQueueBinding existingBinding = (RemoteQueueBinding) postOffice.getBinding(clusterName);
if (existingBinding != null) {
existingBinding.setFilter(FilterImpl.createFilter(filterString));
}
}
private synchronized void doBindingAdded(final ClientMessage message) throws Exception {
logger.trace("{} Adding binding {}", ClusterConnectionImpl.this, message);
if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {

View File

@ -50,12 +50,12 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
private final long remoteQueueID;
private final Filter queueFilter;
private final Set<Filter> filters = new HashSet<>();
private final Map<SimpleString, Integer> filterCounts = new HashMap<>();
private Filter queueFilter;
private int consumerCount;
private final SimpleString idsHeaderName;
@ -351,6 +351,11 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
return remoteQueueID;
}
@Override
public void setFilter(Filter filter) {
this.queueFilter = filter;
}
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return messageLoadBalancingType;

View File

@ -4322,7 +4322,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
.setTemporary(temporary)
.setInternal(internalQueue)
.setTransient(refCountForConsumers instanceof TransientQueueManagerImpl)
.setAutoCreated(autoCreated);
.setAutoCreated(autoCreated)
.setEnabled(enabled)
.setGroupRebalancePauseDispatch(groupRebalancePauseDispatch);
}
protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {

View File

@ -16,9 +16,17 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -86,6 +94,53 @@ public class MessageLoadBalancingTest extends ClusterTestBase {
Assert.assertNull(clientMessage);
}
@Test
public void testMessageLoadBalancingWithFiltersUpdate() throws Exception {
setupCluster(MessageLoadBalancingType.ON_DEMAND);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, false);
Binding[] bindings = new Binding[2];
PostOffice[] po = new PostOffice[2];
for (int i = 0; i < 2; i++) {
po[i] = servers[i].getPostOffice();
bindings[i] = po[i].getBinding(new SimpleString("queue0"));
Assert.assertNotNull(bindings[i]);
Queue queue0 = (Queue)bindings[i].getBindable();
Assert.assertNotNull(queue0);
QueueConfiguration qconfig = queue0.getQueueConfiguration();
Assert.assertNotNull(qconfig);
qconfig.setFilterString("color = 'red'");
po[i].updateQueue(qconfig, true);
}
SimpleString clusterName0 = bindings[1].getClusterName();
RemoteQueueBinding remoteBinding = (RemoteQueueBinding) po[0].getBinding(clusterName0);
Assert.assertNotNull(remoteBinding);
Wait.assertEquals("color = 'red'", () -> {
Filter filter = remoteBinding.getFilter();
if (filter == null) {
return filter;
}
return filter.getFilterString().toString();
});
}
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);

View File

@ -38,6 +38,7 @@ import org.junit.Assert;
import org.junit.Test;
public class ConfigurationTest extends ActiveMQTestBase {
@Test
public void testStartWithDuplicateQueues() throws Exception {
ActiveMQServer server = getActiveMQServer("duplicate-queues.xml");

View File

@ -514,6 +514,10 @@ public class BindingsImplTest extends ActiveMQTestBase {
return 0;
}
@Override
public void setFilter(Filter filter) {
}
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return messageLoadBalancingType;