ARTEMIS-3776 Avoid Integer.MAX_VALUE overflow on openwire clients

Older versions of Openwire clients wil be affected by AMQ-6431.
As a result of the issue if the ID of the message>Integer.MAX_VALUE
a consumer configured with Failover and doing duplicate detection on the client
will not be able to process duplicate detection accordingly and miss messages.
This commit is contained in:
Clebert Suconic 2022-04-12 21:58:13 -04:00 committed by clebertsuconic
parent 316fe8a350
commit bc17acd6da
8 changed files with 267 additions and 2 deletions

View File

@ -624,8 +624,10 @@ public final class OpenWireMessageConverter {
mid = new MessageId(midString.toString());
} else {
//JMSMessageID should be started with "ID:" and needs to be globally unique (node + journal id)
String midd = "ID:" + serverNodeUUID + ":-1:-1:-1";
mid = new MessageId(midd, coreMessage.getMessageID());
// ARTEMIS-3776 due to AMQ-6431 some older clients will not be able to receive messages
// if using a failover schema due to the messageID overFlowing Integer.MAX_VALUE
String midd = "ID:" + serverNodeUUID + ":-1:-1:" + (coreMessage.getMessageID() / Integer.MAX_VALUE);
mid = new MessageId(midd, coreMessage.getMessageID() % Integer.MAX_VALUE);
}
amqMsg.setMessageId(mid);

View File

@ -87,6 +87,10 @@ public class NullStorageManager implements StorageManager {
return dummy;
}
protected void setNextId(long id) {
idSequence.set(id);
}
public NullStorageManager() {
this(new IOCriticalErrorListener() {
@Override

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.nullpm;
import org.apache.activemq.artemis.core.persistence.StorageManager;
public class NullStorageAccessor {
public static void setNextID(StorageManager sm, long id) {
((NullStorageManager)sm).setNextId(id);
}
}

View File

@ -631,6 +631,27 @@
<variableName>HORNETQ-247</variableName>
</configuration>
</execution>
<execution>
<id>openwire-5.11</id>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<configuration>
<optional>true</optional>
<libListWithDeps>
<arg>org.apache.activemq:activemq-client:5.11.0</arg>
<arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<!-- for future maintainers, notice that if you add new variables you also need to add the system property
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>AMQ_5_11</variableName>
</configuration>
</execution>
</executions>
</plugin>
@ -691,6 +712,11 @@
<name>ARTEMIS-JAKARTAEE</name>
<value>${ARTEMIS-JAKARTAEE}</value>
</property>
<property>
<name>AMQ_5_11</name>
<value>${AMQ_5_11}</value>
</property>
<variableName>AMQ_5_11</variableName>
</systemProperties>
<skipTests>${skipCompatibilityTests}</skipTests>
<argLine>${modular.jdk.surefire.arg} -Djgroups.bind_addr=::1 ${activemq-surefire-argline}</argLine>

View File

@ -39,6 +39,7 @@ public class GroovyRun {
public static final String TWO_TEN_ZERO = "ARTEMIS-2_10_0";
public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247";
public static final String AMQ_5_11 = "AMQ_5_11";
public static Binding binding = new Binding();
public static GroovyShell shell = new GroovyShell(binding);

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.Connection
import javax.jms.MessageConsumer
import java.util.Date;
import java.util.UUID;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
connection = cf.createConnection();
final int numberOfMessages = Integer.parseInt(arg[0])
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue("Test");
final MessageConsumer consumer = session.createConsumer(queue)
connection.start();
for (int i = 0; i < numberOfMessages; i++) {
final TextMessage tm = (TextMessage)consumer.receive(1000);
GroovyRun.assertNotNull(tm)
GroovyRun.assertEquals("m" + i, tm.getText());
}
connection.close();

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import javax.jms.Connection
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
{
final String url = "(tcp://localhost:61616)?ha=true&initialConnectAttempts=-1&reconnectAttempts=-1&retryInterval=1000&retryIntervalMultiplier=1.0"
final String queueName = "Test"
final int startMessages = Integer.parseInt(arg[0]);
final int numberOfMessages = Integer.parseInt(arg[1]);
Connection c;
try {
final ConnectionFactory cf = new ActiveMQJMSConnectionFactory(url);
c = cf.createConnection();
final Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue q = s.createQueue(queueName);
final MessageProducer p = s.createProducer(q);
for (int i = startMessages; i < numberOfMessages; i++) {
final TextMessage m = s.createTextMessage("m" + i);
p.send(m);
}
}
finally {
if (c != null) c.close();
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageAccessor;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.AMQ_5_11;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
/**
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
*
* cd /compatibility-tests
* mvn install -Ptests | tee output.log
*
* on the output.log you will see the output generated by {@link #getClasspath(String)}
*
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
* On Idea you would do the following:
*
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
public class OldOpenWireTest extends ClasspathBase {
EmbeddedActiveMQ server;
@Before
public void setServer() throws Throwable {
ConfigurationImpl configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
server = new EmbeddedActiveMQ();
server.setConfiguration(configuration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo("Test").addRoutingType(RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(new QueueConfiguration("Test").setDurable(true).setRoutingType(RoutingType.ANYCAST));
server.getActiveMQServer().addAddressInfo(new AddressInfo("DLQ").addRoutingType(RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(new QueueConfiguration("DLQ").setDurable(true).setRoutingType(RoutingType.ANYCAST));
server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("DLQ")));
}
@After
public void shutdownServer() throws Throwable {
if (server != null) {
server.stop();
}
}
@Test
public void testIDOverflow() throws Throwable {
Queue queue = server.getActiveMQServer().locateQueue("Test");
Queue dlq = server.getActiveMQServer().locateQueue("DLQ");
NullStorageAccessor.setNextID(server.getActiveMQServer().getStorageManager(), Integer.MAX_VALUE);
evaluate(getClasspath(SNAPSHOT), "oldOpenWire/sendCore.groovy", "0", "10");
Wait.assertEquals(10L, queue::getMessageCount, 1000, 10);
NullStorageAccessor.setNextID(server.getActiveMQServer().getStorageManager(), Integer.MAX_VALUE * 2L);
evaluate(getClasspath(SNAPSHOT), "oldOpenWire/sendCore.groovy", "10", "20");
Wait.assertEquals(20L, queue::getMessageCount, 1000, 10);
evaluate(getClasspath(AMQ_5_11), "oldOpenWire/receiveOW.groovy", "20");
Wait.assertEquals(0L, queue::getMessageCount, 1000, 100);
Assert.assertEquals(0L, dlq.getMessageCount());
}
}