ARTEMIS-2011 Fixing incompatibility of AddressSettings encode between versions

To fix this I added a retry on AddressSettings using code that's closer to the original version
This commit is contained in:
Clebert Suconic 2018-08-03 18:19:05 -04:00
parent 3bb7df2de4
commit b710df7844
6 changed files with 117 additions and 16 deletions

View File

@ -20,10 +20,10 @@ import java.io.Serializable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.Mergeable; import org.apache.activemq.artemis.core.settings.Mergeable;
import org.apache.activemq.artemis.utils.BufferHelper; import org.apache.activemq.artemis.utils.BufferHelper;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
@ -729,6 +729,17 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
@Override @Override
public void decode(ActiveMQBuffer buffer) { public void decode(ActiveMQBuffer buffer) {
int original = buffer.readerIndex();
try {
decode(buffer, false);
} catch (Throwable e) {
buffer.readerIndex(original);
// Try a compatible version where the wire was broken
decode(buffer, true);
}
}
public void decode(ActiveMQBuffer buffer, boolean tryCompatible) {
SimpleString policyStr = buffer.readNullableSimpleString(); SimpleString policyStr = buffer.readNullableSimpleString();
if (policyStr != null) { if (policyStr != null) {
@ -791,7 +802,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
autoDeleteQueues = BufferHelper.readNullableBoolean(buffer); autoDeleteQueues = BufferHelper.readNullableBoolean(buffer);
policyStr = buffer.readNullableSimpleString(); policyStr = tryCompatible ? null : buffer.readNullableSimpleString();
if (policyStr != null) { if (policyStr != null) {
configDeleteQueues = DeletionPolicy.valueOf(policyStr.toString()); configDeleteQueues = DeletionPolicy.valueOf(policyStr.toString());
@ -803,7 +814,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
autoDeleteAddresses = BufferHelper.readNullableBoolean(buffer); autoDeleteAddresses = BufferHelper.readNullableBoolean(buffer);
policyStr = buffer.readNullableSimpleString(); policyStr = tryCompatible ? null : buffer.readNullableSimpleString();
if (policyStr != null) { if (policyStr != null) {
configDeleteAddresses = DeletionPolicy.valueOf(policyStr.toString()); configDeleteAddresses = DeletionPolicy.valueOf(policyStr.toString());

View File

@ -350,6 +350,50 @@
<variableName>ARTEMIS-240</variableName> <variableName>ARTEMIS-240</variableName>
</configuration> </configuration>
</execution> </execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>210-check</id>
<configuration>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.1.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.1.0</arg>
<arg>org.apache.activemq:artemis-cli:2.1.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.1.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.1.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.1.0</arg>
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<variableName>ARTEMIS-210</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>200-check</id>
<configuration>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.0.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.0.0</arg>
<arg>org.apache.activemq:artemis-cli:2.0.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.0.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.0.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.0.0</arg>
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<variableName>ARTEMIS-200</variableName>
</configuration>
</execution>
<execution> <execution>
<id>140-check</id> <id>140-check</id>
<phase>compile</phase> <phase>compile</phase>
@ -453,6 +497,14 @@
<name>ARTEMIS-SNAPSHOT</name> <name>ARTEMIS-SNAPSHOT</name>
<value>${ARTEMIS-SNAPSHOT}</value> <value>${ARTEMIS-SNAPSHOT}</value>
</property> </property>
<property>
<name>ARTEMIS-200</name>
<value>${ARTEMIS-200}</value>
</property>
<property>
<name>ARTEMIS-210</name>
<value>${ARTEMIS-210}</value>
</property>
<property> <property>
<name>ARTEMIS-240</name> <name>ARTEMIS-240</name>
<value>${ARTEMIS-240}</value> <value>${ARTEMIS-240}</value>

View File

@ -30,6 +30,8 @@ public class GroovyRun {
public static final String SNAPSHOT = "ARTEMIS-SNAPSHOT"; public static final String SNAPSHOT = "ARTEMIS-SNAPSHOT";
public static final String ONE_FIVE = "ARTEMIS-155"; public static final String ONE_FIVE = "ARTEMIS-155";
public static final String ONE_FOUR = "ARTEMIS-140"; public static final String ONE_FOUR = "ARTEMIS-140";
public static final String TWO_ZERO = "ARTEMIS-200";
public static final String TWO_ONE = "ARTEMIS-210";
public static final String TWO_FOUR = "ARTEMIS-240"; public static final String TWO_FOUR = "ARTEMIS-240";
public static final String HORNETQ_235 = "HORNETQ-235"; public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247"; public static final String HORNETQ_247 = "HORNETQ-247";

View File

@ -1,4 +1,7 @@
package servers package servers
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
@ -17,15 +20,11 @@ package servers
*/ */
// starts an artemis server // starts an artemis server
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.JournalType import org.apache.activemq.artemis.core.server.JournalType
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
String folder = arg[0]; String folder = arg[0];
String id = arg[1]; String id = arg[1];
@ -63,3 +62,31 @@ server.start();
server.getJMSServerManager().createTopic(true, "topic"); server.getJMSServerManager().createTopic(true, "topic");
server.getJMSServerManager().createQueue(true, "queue", null, true); server.getJMSServerManager().createQueue(true, "queue", null, true);
if (setAddressSettings) {
// this is to force records that will have pittfals between versions
server.getJMSServerManager().getActiveMQServer().getActiveMQServerControl().
addAddressSettings("ad1", //@Parameter(desc = "an address match", name = "addressMatch") String addressMatch,
"dla", // @Parameter(desc = "the dead letter address setting", name = "DLA") String DLA,
"exp", //@Parameter(desc = "the expiry address setting", name = "expiryAddress") String expiryAddress,
0l, //@Parameter(desc = "the expiry delay setting", name = "expiryDelay") long expiryDelay,
false, //@Parameter(desc = "are any queues created for this address a last value queue", name = "lastValueQueue") boolean lastValueQueue,
1, //@Parameter(desc = "the delivery attempts", name = "deliveryAttempts") int deliveryAttempts,
10 * 1024 * 1024, //@Parameter(desc = "the max size in bytes", name = "maxSizeBytes") long maxSizeBytes,
1024 * 1024, //@Parameter(desc = "the page size in bytes", name = "pageSizeBytes") int pageSizeBytes,
3, //@Parameter(desc = "the max number of pages in the soft memory cache", name = "pageMaxCacheSize") int pageMaxCacheSize,
0l, //@Parameter(desc = "the redelivery delay", name = "redeliveryDelay") long redeliveryDelay,
0, //@Parameter(desc = "the redelivery delay multiplier", name = "redeliveryMultiplier") double redeliveryMultiplier,
0, //@Parameter(desc = "the maximum redelivery delay", name = "maxRedeliveryDelay") long maxRedeliveryDelay,
0, //@Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay,
false, //@Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute,
"BLOCK", //@Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy,
1000, //@Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
1000, //@Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod,
"NOTIFY", //@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
true, //@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
true, // @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
true, //@Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
true) //@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception;
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.compatibility;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ONE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ZERO;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -62,6 +64,8 @@ public class JournalCompatibilityTest extends VersionedBaseTest {
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE}); // combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE}); // combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
combinations.add(new Object[]{null, TWO_ZERO, SNAPSHOT});
combinations.add(new Object[]{null, TWO_ONE, SNAPSHOT});
combinations.add(new Object[]{null, TWO_FOUR, SNAPSHOT}); combinations.add(new Object[]{null, TWO_FOUR, SNAPSHOT});
// the purpose on this one is just to validate the test itself. // the purpose on this one is just to validate the test itself.
/// if it can't run against itself it won't work at all /// if it can't run against itself it won't work at all
@ -94,12 +98,12 @@ public class JournalCompatibilityTest extends VersionedBaseTest {
@Test @Test
public void testSendReceive() throws Throwable { public void testSendReceive() throws Throwable {
setVariable(senderClassloader, "persistent", true); setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest"); startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
stopServer(senderClassloader); stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true); setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest"); startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);
setVariable(receiverClassloader, "latch", null); setVariable(receiverClassloader, "latch", null);
evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
@ -112,12 +116,12 @@ public class JournalCompatibilityTest extends VersionedBaseTest {
@Test @Test
public void testSendReceiveQueueMetrics() throws Throwable { public void testSendReceiveQueueMetrics() throws Throwable {
setVariable(senderClassloader, "persistent", true); setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest"); startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
stopServer(senderClassloader); stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true); setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest"); startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);
setVariable(receiverClassloader, "latch", null); setVariable(receiverClassloader, "latch", null);
evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages"); evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages");
@ -132,14 +136,14 @@ public class JournalCompatibilityTest extends VersionedBaseTest {
public void testSendReceiveSizeQueueMetricsPaging() throws Throwable { public void testSendReceiveSizeQueueMetricsPaging() throws Throwable {
setVariable(senderClassloader, "persistent", true); setVariable(senderClassloader, "persistent", true);
//Set max size to 1 to cause messages to immediately go to the paging store //Set max size to 1 to cause messages to immediately go to the paging store
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", Long.toString(1)); startServer(serverFolder.getRoot(), senderClassloader, "journalTest", Long.toString(1), true);
evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy"); evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy");
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
evaluate(senderClassloader, "journalcompatibility/ispaging.groovy"); evaluate(senderClassloader, "journalcompatibility/ispaging.groovy");
stopServer(senderClassloader); stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true); setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", Long.toString(1)); startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", Long.toString(1), false);
evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy"); evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy");

View File

@ -190,6 +190,10 @@ public abstract class VersionedBaseTest {
} }
public void startServer(File folder, ClassLoader loader, String serverName, String globalMaxSize) throws Throwable { public void startServer(File folder, ClassLoader loader, String serverName, String globalMaxSize) throws Throwable {
startServer(folder, loader, serverName, globalMaxSize, false);
}
public void startServer(File folder, ClassLoader loader, String serverName, String globalMaxSize, boolean setAddressSettings) throws Throwable {
folder.mkdirs(); folder.mkdirs();
String scriptToUse; String scriptToUse;
@ -201,6 +205,7 @@ public abstract class VersionedBaseTest {
scriptToUse = "servers/hornetqServer.groovy"; scriptToUse = "servers/hornetqServer.groovy";
} }
setVariable(loader, "setAddressSettings", setAddressSettings);
evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver, globalMaxSize); evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver, globalMaxSize);
} }
public void stopServer(ClassLoader loader) throws Throwable { public void stopServer(ClassLoader loader) throws Throwable {