From b8a2580410ac12a3b2e0ab7222bab4669d1edb02 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 29 May 2019 10:56:38 -0400 Subject: [PATCH] ARTEMIS-2360 IOOBE when dealing with older clients concurrently --- .../activemq/artemis/api/core/Message.java | 4 +- .../core/message/impl/CoreMessage.java | 2 +- .../wireformat/SessionReceiveMessage_1X.java | 13 ++ .../hornetq/PropertiesConversionTest.java | 93 ++++++++++++- .../resources/hqsoak/artemisServer.groovy | 70 ++++++++++ .../resources/hqsoak/receiveMessages.groovy | 105 ++++++++++++++ .../main/resources/hqsoak/sendMessages.groovy | 109 +++++++++++++++ .../tests/compatibility/HornetQSoakTest.java | 129 ++++++++++++++++++ .../compatibility/base/ClasspathBase.java | 17 +++ .../compatibility/base/VersionedBase.java | 7 +- 10 files changed, 534 insertions(+), 15 deletions(-) create mode 100644 tests/compatibility-tests/src/main/resources/hqsoak/artemisServer.groovy create mode 100644 tests/compatibility-tests/src/main/resources/hqsoak/receiveMessages.groovy create mode 100644 tests/compatibility-tests/src/main/resources/hqsoak/sendMessages.groovy create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HornetQSoakTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index e701482a48..8731f8a59e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -228,7 +228,9 @@ public interface Message { return this; } - + /** + * WARNING: Calling this method on a AMQPMessage will allow the non mutable part of the message to be modified. + */ void messageChanged(); /** Used to calculate what is the delivery time. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 06304bc2c8..3596720ab4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -403,7 +403,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override - public void messageChanged() { + public synchronized void messageChanged() { //a volatile store is a costly operation: better to check if is necessary if (validBuffer) { validBuffer = false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java index 24758379d5..6b5f98171b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage_1X.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; public class SessionReceiveMessage_1X extends SessionReceiveMessage { @@ -40,6 +41,18 @@ public class SessionReceiveMessage_1X extends SessionReceiveMessage { buffer.writeInt(deliveryCount); } + @Override + public ActiveMQBuffer encode(final CoreRemotingConnection connection) { + ICoreMessage messageSynchronize = message; + synchronized (messageSynchronize) { + // messageSynchronize.messageChanged(); + // there's a possible race, in case non standard components touch the message body + // between allocating and actual encoding there could be a difference in size + // sendBuffer_1X would fail + return super.encode(connection); + } + } + @Override protected void receiveMessage(ByteBuf buffer) { message.receiveBuffer_1X(buffer); diff --git a/artemis-protocols/artemis-hornetq-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/hornetq/PropertiesConversionTest.java b/artemis-protocols/artemis-hornetq-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/hornetq/PropertiesConversionTest.java index 67fb3e93a1..9094d03ca5 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/hornetq/PropertiesConversionTest.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/hornetq/PropertiesConversionTest.java @@ -18,15 +18,17 @@ package org.apache.activemq.artemis.core.protocol.hornetq; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Test; @@ -51,15 +53,16 @@ public class PropertiesConversionTest { } @Test - public void testParallelConversions() throws Throwable { + public void testParallelHornetQConversions() throws Throwable { CoreMessage coreMessage = new CoreMessage(1, 1024); for (int i = 0; i < 10; i++) { coreMessage.putBooleanProperty(SimpleString.toSimpleString("key1"), true); } coreMessage.putStringProperty(new SimpleString("_HQ_ORIG_ADDRESS"), SimpleString.toSimpleString("hqOne")); coreMessage.putStringProperty(new SimpleString("_AMQ_ORIG_QUEUE"), SimpleString.toSimpleString("amqOne")); + coreMessage.putStringProperty(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), SimpleString.toSimpleString("asdfkhaksdjfhaskfdjhas")); - int threads = 1000; + int threads = 100; int conversions = 100; Thread[] t = new Thread[threads]; CyclicBarrier barrier = new CyclicBarrier(threads); @@ -94,9 +97,10 @@ public class PropertiesConversionTest { packetRec.getMessage().putStringProperty("propChanges", "short one"); } - ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(packetRec.getMessage().getEncodeSize() + 4); - packetRec.getMessage().sendBuffer_1X(buf); - buf.release(); + SessionReceiveMessage_1X receiveMessage_1X = new SessionReceiveMessage_1X(coreMessage); + ActiveMQBuffer buffer = receiveMessage_1X.encode(null); + buffer.release(); + if (i > conversions / 2) { // I only validate half of the messages // to give it a chance of Races and Exceptions @@ -123,4 +127,79 @@ public class PropertiesConversionTest { Assert.assertEquals(0, errors.get()); } + + @Test + public void testMultiThreadChanges() throws Throwable { + CoreMessage coreMessage = new CoreMessage(1, 1024); + for (int i = 0; i < 10; i++) { + coreMessage.putBooleanProperty(SimpleString.toSimpleString("key1"), true); + } + coreMessage.putStringProperty(new SimpleString("_HQ_ORIG_ADDRESS"), SimpleString.toSimpleString("hqOne")); + coreMessage.putStringProperty(new SimpleString("_AMQ_ORIG_QUEUE"), SimpleString.toSimpleString("amqOne")); + coreMessage.putStringProperty(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), SimpleString.toSimpleString("asdfkhaksdjfhaskfdjhas")); + + int threads = 100; + int conversions = 100; + Thread[] t = new Thread[threads]; + CyclicBarrier barrier = new CyclicBarrier(threads); + AtomicInteger errors = new AtomicInteger(0); + AtomicInteger counts = new AtomicInteger(0); + + AtomicBoolean running = new AtomicBoolean(true); + + for (int i = 0; i < threads; i++) { + t[i] = new Thread() { + + @Override + public void run() { + try { + for (int i = 0; i < conversions; i++) { + counts.incrementAndGet(); + if (i == 0) { + barrier.await(); + } + + // heads or tails here, I need part of the messages with a big header, part of the messages with a small header + boolean heads = RandomUtil.randomBoolean(); + + // this is playing with a scenario where the horentq interceptor will change the size of the message + if (heads) { + coreMessage.putStringProperty("propChanges", "looooooooooooooooooooong property text"); + } else { + coreMessage.putStringProperty("propChanges", "short one"); + } + + heads = RandomUtil.randomBoolean(); + + if (heads) { + SessionReceiveMessage_1X receiveMessage_1X = new SessionReceiveMessage_1X(coreMessage); + ActiveMQBuffer buffer = receiveMessage_1X.encode(null); + buffer.release(); + } else { + SessionReceiveMessage receiveMessage = new SessionReceiveMessage(coreMessage); + ActiveMQBuffer buffer = receiveMessage.encode(null); + buffer.release(); + } + } + } catch (Throwable e) { + errors.incrementAndGet(); + e.printStackTrace(); + } + } + }; + t[i].start(); + } + + running.set(false); + + for (Thread thread : t) { + thread.join(); + } + + Assert.assertEquals(threads * conversions, counts.get()); + + Assert.assertEquals(0, errors.get()); + } + + } diff --git a/tests/compatibility-tests/src/main/resources/hqsoak/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/hqsoak/artemisServer.groovy new file mode 100644 index 0000000000..e51a9d0896 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqsoak/artemisServer.groovy @@ -0,0 +1,70 @@ +package servers + +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.postoffice.impl.AddressImpl + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +import org.apache.activemq.artemis.core.server.JournalType +import org.apache.activemq.artemis.core.server.impl.AddressInfo +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy +import org.apache.activemq.artemis.core.settings.impl.AddressSettings +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS + +String folder = arg[0]; +String id = arg[1]; +String type = arg[2]; +String producer = arg[3]; +String consumer = arg[4]; +String globalMaxSize = arg[5]; + +configuration = new ConfigurationImpl(); +configuration.setJournalType(JournalType.NIO); +configuration.setBrokerInstance(new File(folder + "/" + id)); +configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616?anycastPrefix=jms.queue.&multicastPrefix=jms.topic."); +configuration.setSecurityEnabled(false); +configuration.setPersistenceEnabled(true); +try { + if (!type.startsWith("ARTEMIS-1")) { + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(false).setDeadLetterAddress(SimpleString.toSimpleString("DLQ")) + .setExpiryAddress(SimpleString.toSimpleString("EXP"))); + if (globalMaxSize != null) { + configuration.getAddressesSettings().get("#").setPageSizeBytes(Long.parseLong(globalMaxSize)).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE) + configuration.setGlobalMaxSize(Long.parseLong(globalMaxSize)); + } + } +} catch (Throwable e) { + // need to ignore this for 1.4 + e.printStackTrace(); +} + +jmsConfiguration = new JMSConfigurationImpl(); + +// used here even though it's deprecated to be compatible with older versions of the broker +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + +AddressInfo info = new AddressInfo(SimpleString.toSimpleString("topic")).setAutoCreated(false).addRoutingType(RoutingType.MULTICAST); +server.activeMQServer.addAddressInfo(info); + diff --git a/tests/compatibility-tests/src/main/resources/hqsoak/receiveMessages.groovy b/tests/compatibility-tests/src/main/resources/hqsoak/receiveMessages.groovy new file mode 100644 index 0000000000..de2d37ad64 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqsoak/receiveMessages.groovy @@ -0,0 +1,105 @@ +package hqsoak + +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +String clientType = arg[0]; +int consumers = Integer.parseInt(arg[1]) +final int messagesPerConsumer = Integer.parseInt(arg[2]) + +try { + legacyOption = legacy; +} catch (Throwable e) { + legacyOption = false; +} + + +if (legacyOption) { + queueName = "jms.queue.queue" + topicName = "jms.topic.topic" +} else { + queueName = "queue"; + topicName = "topic"; +} + +String textBody = "a rapadura e doce mas nao e mole nao"; + + +if (clientType.startsWith("ARTEMIS")) { + // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq + GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", "ARTEMIS"); +} else { + // Can't depend directly on hornetq, otherwise it wouldn't compile in artemis + GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg"); +} + +errorsConsumer = new AtomicInteger(0); +final AtomicInteger running = new AtomicInteger(0); +CountDownLatch latchStarted = new CountDownLatch(consumers); + +for (int i = 0; i < consumers; i++) { + Runnable r = new Runnable() { + @Override + void run() { + try { + int threadid = running.incrementAndGet(); + System.out.println("Running " + threadid) + Connection connection = cf.createConnection(); + connection.setClientID(clientType + threadid) + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + connection.start(); + latchStarted.countDown(); + MessageConsumer consumer = session.createDurableSubscriber(topic, "test") + + for (int m = 0; m < messagesPerConsumer; m++) { + TextMessage msg = consumer.receive(5000); + if (msg == null) { + errorsConsumer.incrementAndGet(); + System.err.println("Could not receive message") + break; + } + reusableLatch.countDown(); + } + connection.close(); + } + catch (Exception e) { + e.printStackTrace() + errorsConsumer.incrementAndGet(); + } finally { + running.decrementAndGet(); + } + } + } + Thread t = new Thread(r); + t.start(); +} +if (!latchStarted.await(10, TimeUnit.SECONDS)) { + System.err.prntln("Could not start consumers") + errorsConsumer.incrementAndGet() +} +System.out.println("Running::" + clientType + "::" + running) +return running; + diff --git a/tests/compatibility-tests/src/main/resources/hqsoak/sendMessages.groovy b/tests/compatibility-tests/src/main/resources/hqsoak/sendMessages.groovy new file mode 100644 index 0000000000..08b05a9a42 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqsoak/sendMessages.groovy @@ -0,0 +1,109 @@ +package hqsoak + +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +String clientType = arg[0]; +producers = Integer.parseInt(arg[1]) +numberOfMessages = Integer.parseInt(arg[2]) + +try { + legacyOption = legacy; +} catch (Throwable e) { + legacyOption = false; +} + + +if (legacyOption) { + queueName = "jms.queue.queue" + topicName = "jms.topic.topic" +} else { + queueName = "queue"; + topicName = "topic"; +} + +System.out.println("topicName = " + topicName); + + +if (clientType.startsWith("ARTEMIS")) { + // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq + GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", "ARTEMIS"); +} else { + // Can't depend directly on hornetq, otherwise it wouldn't compile in artemis + GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg"); +} + +errorsProducer = new AtomicInteger(0); +final AtomicInteger ran = new AtomicInteger(0); + +StringBuffer bufferStr = new StringBuffer(); +for (int i = 0; i < 200 * 1024; i++) { + bufferStr.append(" "); +} + +largeMessageBody = bufferStr.toString(); + +for (int i = 0; i < producers; i++) { + Runnable r = new Runnable() { + @Override + void run() { + try { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + MessageProducer producer = session.createProducer(topic); + + for (int m = 0; m < numberOfMessages; m++) { + + for (int j = 0; j < multiplyFactor; j++) { + reusableLatch.countUp() + } + + if (m % 10 == 0) { + producer.send(session.createTextMessage(largeMessageBody)); + //System.out.println("Sending regular ") + } else { + producer.send(session.createTextMessage("This is a regular message")); + //System.out.println("Sending large ") + } + + reusableLatch.await(10, TimeUnit.SECONDS) + } + + connection.close(); + } + catch (Exception e) { + e.printStackTrace() + errorsProducer.incrementAndGet(); + } finally { + ran.incrementAndGet(); + } + } + } + Thread t = new Thread(r); + t.start(); +} + +return ran; + diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HornetQSoakTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HornetQSoakTest.java new file mode 100644 index 0000000000..7f2326cce3 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HornetQSoakTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; + +/** + * To run this test on the IDE and debug it, run the compatibility-tests through a command line once: + * + * cd /compatibility-tests + * mvn install -Ptests | tee output.log + * + * on the output.log you will see the output generated by {@link #getClasspath(String)} + * + * On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test. + * On Idea you would do the following: + * + * Run->Edit Configuration->Add ArtemisMeshTest and add your properties. + */ +public class HornetQSoakTest extends ClasspathBase { + + ClassLoader artemisClassLoader; + ClassLoader artemis1XClassLoader; + ClassLoader hornetqClassLoader; + + @Before + public void setUp() throws Throwable { + + this.artemisClassLoader = getClasspath(SNAPSHOT); + this.artemis1XClassLoader = getClasspath(ONE_FIVE); + this.hornetqClassLoader = getClasspath(HORNETQ_235); + + FileUtil.deleteDirectory(serverFolder.getRoot()); + setVariable(artemisClassLoader, "persistent", Boolean.FALSE); + startServer(serverFolder.getRoot(), artemisClassLoader, "live", null, true, "hqsoak/artemisServer.groovy", SNAPSHOT, SNAPSHOT, SNAPSHOT); + } + + @After + public void tearDown() throws Throwable { + if (artemisClassLoader != null && hornetqClassLoader != null) { + stopServer(artemisClassLoader); + } + } + + @Test + public void testSoakHornetQ() throws Throwable { + ReusableLatch reusableLatch = new ReusableLatch(0); + + int threadsProducerArtemis = 2; + int numberOfMessagesArtemis = 5; + int threadsProducerHornetQ = 2; + int numberOfMessagesHornetQ = 5; + + int numberOfConsumersArtemis = 10; + int numberOfConsumersArtemis1X = 5; + int numberOfConsumersHornetQ = 10; + + int multiplyFactor = numberOfConsumersArtemis + numberOfConsumersArtemis1X + numberOfConsumersHornetQ; + + setVariable(artemisClassLoader, "reusableLatch", reusableLatch); + setVariable(artemisClassLoader, "multiplyFactor", multiplyFactor); + setVariable(artemis1XClassLoader, "reusableLatch", reusableLatch); + setVariable(artemis1XClassLoader, "multiplyFactor", multiplyFactor); + setVariable(hornetqClassLoader, "reusableLatch", reusableLatch); + setVariable(hornetqClassLoader, "multiplyFactor", multiplyFactor); + + int totalMessagePerQueue = threadsProducerArtemis * numberOfMessagesArtemis + threadsProducerHornetQ * numberOfMessagesHornetQ; + + AtomicInteger runningConsumerArtemis = (AtomicInteger) evaluate(artemisClassLoader, "hqsoak/receiveMessages.groovy", "ARTEMIS", "" + numberOfConsumersArtemis, "" + totalMessagePerQueue); + AtomicInteger runningConsumer1XArtemis = (AtomicInteger) evaluate(artemis1XClassLoader, "hqsoak/receiveMessages.groovy", "ARTEMIS1X", "" + numberOfConsumersArtemis, "" + totalMessagePerQueue); + AtomicInteger runningConsumerHornetQ = (AtomicInteger) evaluate(hornetqClassLoader, "hqsoak/receiveMessages.groovy", "HORNETQ", "" + numberOfConsumersHornetQ, "" + totalMessagePerQueue); + + System.out.println("Running producers"); + + AtomicInteger ranArtemisProducer = (AtomicInteger) evaluate(artemisClassLoader, "hqsoak/sendMessages.groovy", "ARTEMIS", "" + threadsProducerArtemis, "" + numberOfMessagesArtemis); + AtomicInteger ranHornetQProducer = (AtomicInteger) evaluate(hornetqClassLoader, "hqsoak/sendMessages.groovy", "HORNETQ", "" + threadsProducerHornetQ, "" + numberOfMessagesHornetQ); + + Wait.assertEquals(threadsProducerArtemis, ranArtemisProducer::get, TimeUnit.MINUTES.toMillis(5), 100); + Wait.assertEquals(threadsProducerHornetQ, ranHornetQProducer::get, TimeUnit.MINUTES.toMillis(5), 100); + + Wait.assertEquals(0, runningConsumerArtemis::get); + Wait.assertEquals(0, runningConsumerHornetQ::get); + Wait.assertEquals(0, runningConsumer1XArtemis::get); + + checkErrors(artemisClassLoader, "errorsProducer"); + checkErrors(hornetqClassLoader, "errorsProducer"); + checkErrors(artemisClassLoader, "errorsConsumer"); + checkErrors(hornetqClassLoader, "errorsConsumer"); + checkErrors(artemis1XClassLoader, "errorsConsumer"); + + // execute(artemisClassLoader, ) + + } + + protected void checkErrors(ClassLoader loader, String variable) throws Throwable { + AtomicInteger errors = (AtomicInteger) execute(loader, "return " + variable); + Assert.assertEquals("the script finished with errors", 0, errors.get()); + } + +} diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/base/ClasspathBase.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/base/ClasspathBase.java index 0f821df6ab..3faabffc2c 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/base/ClasspathBase.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/base/ClasspathBase.java @@ -59,6 +59,23 @@ public class ClasspathBase { return new URLClassLoader(elements, null); } + protected static void startServer(File folder, + ClassLoader loader, + String serverName, + String globalMaxSize, + boolean setAddressSettings, + String scriptToUse, + String server, + String sender, + String receiver) throws Exception { + setVariable(loader, "setAddressSettings", setAddressSettings); + evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver, globalMaxSize); + } + + public static void stopServer(ClassLoader loader) throws Throwable { + execute(loader, "server.stop()"); + } + protected ClassLoader getClasspath(String name) throws Exception { return getClasspath(name, false); } diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/base/VersionedBase.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/base/VersionedBase.java index 1ccbf303f2..9beb440ef3 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/base/VersionedBase.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/base/VersionedBase.java @@ -109,12 +109,7 @@ public abstract class VersionedBase extends ClasspathBase { scriptToUse = "servers/hornetqServer.groovy"; } - setVariable(loader, "setAddressSettings", setAddressSettings); - evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver, globalMaxSize); - } - - public void stopServer(ClassLoader loader) throws Throwable { - execute(loader, "server.stop()"); + startServer(folder, loader, serverName, globalMaxSize, setAddressSettings, scriptToUse, server, sender, receiver); } public String getServerScriptToUse() {