ARTEMIS-3223 - ensure distribution uses the address from the message, rather than the address from the queue which may be a wildcard sub and not valid for publishng on, fix and test

This commit is contained in:
gtully 2021-07-26 22:54:36 +01:00 committed by Gary Tully
parent 224b89810d
commit e985df77fb
5 changed files with 135 additions and 19 deletions

View File

@ -1289,29 +1289,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
if (bindings != null && bindings.allowRedistribute()) {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
copyRedistribute.setAddress(originatingQueue.getAddress());
if (tx != null) {
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterRollback(Transaction tx) {
try {
//this will cause large message file to be
//cleaned up
// copyRedistribute.refDown();
} catch (Exception e) {
logger.warn("Failed to clean up message: " + copyRedistribute);
}
}
});
}
copyRedistribute.setAddress(message.getAddress());
RoutingContext context = new RoutingContextImpl(tx);

View File

@ -1755,6 +1755,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222302, value = "Failed to deal with property {0} when converting message from core to OpenWire: {1}", format = Message.Format.MESSAGE_FORMAT)
void failedToDealWithObjectProperty(SimpleString property, String exceptionMessage);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222303, value = "Redistribution by {0} of messageID = {1} failed", format = Message.Format.MESSAGE_FORMAT)
void errorRedistributing(@Cause Throwable t, String queueName, long m);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);

View File

@ -178,6 +178,7 @@ public class Redistributor implements Consumer {
queue.deliverAsync();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRedistributing(e, toManagementString(), reference.getMessageID());
try {
tx.rollback();
} catch (Exception e2) {

View File

@ -136,8 +136,8 @@ public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport {
bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications");
notificationsQueue = SimpleString.toSimpleString("Notifications");
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1);
setupClusterConnection("cluster-1->2", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2);
setupClusterConnection("cluster-2->1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1);
server0.start();

View File

@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
@ -100,6 +101,101 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
assertEquals(payload2, new String(message5.getPayload()));
assertEquals(payload3, new String(message6.getPayload()));
assertNonWildcardTopic(message1);
assertNonWildcardTopic(message2);
assertNonWildcardTopic(message3);
assertNonWildcardTopic(message4);
assertNonWildcardTopic(message5);
assertNonWildcardTopic(message6);
} finally {
String[] topics = new String[]{TOPIC};
if (connection1 != null) {
connection1.unsubscribe(topics);
connection1.disconnect();
}
if (connection2 != null) {
connection2.unsubscribe(topics);
connection2.disconnect();
}
}
}
@Test
public void verifyRedistribution() throws Exception {
final String TOPIC = "test/+/some/#";
final String clientId = "SubOne";
WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
wildcardConfiguration.setAnyWords('#');
wildcardConfiguration.setDelimiter('/');
wildcardConfiguration.setRoutingEnabled(true);
wildcardConfiguration.setSingleWord('+');
setupServer(0, false, isNetty());
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
setupServer(1, false, isNetty());
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
// allow redistribution
AddressSettings addressSettings = new AddressSettings();
addressSettings.setRedistributionDelay(0);
servers[0].getConfiguration().addAddressesSetting("#", addressSettings);
servers[1].getConfiguration().addAddressesSetting("#", addressSettings);
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
startServers(0, 1);
BlockingConnection connection1 = null;
BlockingConnection connection2 = null;
try {
connection1 = retrieveMQTTConnection("tcp://localhost:61616");
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId);
// Subscribe to topics
Topic[] topics = {new Topic(TOPIC, QoS.EXACTLY_ONCE)};
connection2.subscribe(topics);
waitForBindings(0, TOPIC, 0, 0, true);
waitForBindings(1, TOPIC, 1, 1, true);
waitForBindings(0, TOPIC, 1, 1, false);
waitForBindings(1, TOPIC, 0, 0, false);
// Publish Messages
String payload1 = "This is message 1";
String payload2 = "This is message 2";
String payload3 = "This is message 3";
connection1.publish("test/1/some/la", payload1.getBytes(), QoS.EXACTLY_ONCE, false);
connection1.publish("test/1/some/la", payload2.getBytes(), QoS.EXACTLY_ONCE, false);
connection1.publish("test/1/some/la", payload3.getBytes(), QoS.EXACTLY_ONCE, false);
waitForMessages(1, TOPIC, 3);
connection2.disconnect();
// force redistribution
connection2 = retrieveMQTTConnection("tcp://localhost:61616", clientId);
connection2.subscribe(topics);
Message message4 = connection2.receive(15, TimeUnit.SECONDS);
Message message5 = connection2.receive(5, TimeUnit.SECONDS);
Message message6 = connection2.receive(5, TimeUnit.SECONDS);
assertEquals(payload1, new String(message4.getPayload()));
assertEquals(payload2, new String(message5.getPayload()));
assertEquals(payload3, new String(message6.getPayload()));
assertNonWildcardTopic(message4);
assertNonWildcardTopic(message5);
assertNonWildcardTopic(message6);
} finally {
String[] topics = new String[]{TOPIC};
if (connection1 != null) {
@ -189,6 +285,14 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
assertEquals(payload2, new String(message5.getPayload()));
assertEquals(payload3, new String(message6.getPayload()));
assertNonWildcardTopic(message1);
assertNonWildcardTopic(message2);
assertNonWildcardTopic(message3);
assertNonWildcardTopic(message4);
assertNonWildcardTopic(message5);
assertNonWildcardTopic(message6);
} finally {
String[] topics = new String[]{TOPIC};
if (connection1 != null) {
@ -202,9 +306,31 @@ public class MqttClusterWildcardTest extends ClusterTestBase {
}
}
private void assertNonWildcardTopic(Message message1) {
assertNotNull(message1);
String payload = new String(message1.getPayload());
System.err.println("got payload: " + payload);
assertTrue(payload.contains("message"));
String topic = message1.getTopic();
System.err.println("got topic: " + topic);
assertTrue(!topic.contains("+"));
assertTrue(!topic.contains("*"));
assertTrue(!topic.contains("#"));
}
private static BlockingConnection retrieveMQTTConnection(String host) throws Exception {
return retrieveMQTTConnection(host, null);
}
private static BlockingConnection retrieveMQTTConnection(String host, String clientId) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host);
if (clientId != null) {
mqtt.setClientId(clientId);
mqtt.setCleanSession(false);
}
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
return connection;