From 92d6ae87edec0375e24a53f37ad67d595a8511b1 Mon Sep 17 00:00:00 2001 From: Domenico Francesco Bruscino Date: Tue, 22 Dec 2020 00:17:41 +0100 Subject: [PATCH] ARTEMIS-3027 Fixing AMQP persister encoding --- .../core/persistence/PersisterIDs.java | 4 +- .../amqp/broker/AMQPMessagePersisterV2.java | 9 +-- .../amqp/broker/AMQPMessagePersisterV3.java | 77 +++++++++++++++++++ .../amqp/broker/AMQPStandardMessage.java | 2 +- .../broker/ProtonProtocolManagerFactory.java | 2 +- .../protocol/amqp/broker/AMQPMessageTest.java | 2 +- .../amqp/broker/AMQPPersisterTest.java | 2 +- tests/compatibility-tests/pom.xml | 3 + .../clients/artemisClientAMQP.groovy | 39 ++++++++++ .../main/resources/clients/artemisFail.groovy | 21 ++--- .../resources/meshTest/sendMessages.groovy | 43 +++++++++-- .../JournalCompatibilityTest.java | 17 ++++ 12 files changed, 190 insertions(+), 31 deletions(-) create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV3.java create mode 100644 tests/compatibility-tests/src/main/resources/clients/artemisClientAMQP.groovy diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/PersisterIDs.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/PersisterIDs.java index d51accb678..57440552cf 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/PersisterIDs.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/PersisterIDs.java @@ -26,7 +26,7 @@ package org.apache.activemq.artemis.core.persistence; public class PersisterIDs { - public static final int MAX_PERSISTERS = 4; + public static final int MAX_PERSISTERS = 5; public static final byte CoreLargeMessagePersister_ID = (byte)0; @@ -38,4 +38,6 @@ public class PersisterIDs { public static final byte AMQPLargeMessagePersister_ID = (byte)4; + public static final byte AMQPMessagePersisterV3_ID = (byte)5; + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java index 638244df5f..bd7ba0ed21 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java @@ -50,8 +50,7 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister { @Override public int getEncodeSize(Message record) { - int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT + - DataConstants.SIZE_LONG; // expiration + int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT; TypedProperties properties = ((AMQPMessage)record).getExtraProperties(); @@ -71,8 +70,6 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister { buffer.writeInt(properties.getEncodeSize()); properties.encode(buffer.byteBuf()); } - - buffer.writeLong(record.getExpiration()); } @Override @@ -100,10 +97,6 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister { extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null); } record.reloadAddress(address); - - if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { - record.reloadExpiration(buffer.readLong()); - } return record; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV3.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV3.java new file mode 100644 index 0000000000..6fcf3ad2a1 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV3.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.broker; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; +import org.apache.activemq.artemis.utils.DataConstants; + +import static org.apache.activemq.artemis.core.persistence.PersisterIDs.AMQPMessagePersisterV3_ID; + +public class AMQPMessagePersisterV3 extends AMQPMessagePersisterV2 { + + public static final byte ID = AMQPMessagePersisterV3_ID; + + public static AMQPMessagePersisterV3 theInstance; + + public static AMQPMessagePersisterV3 getInstance() { + if (theInstance == null) { + theInstance = new AMQPMessagePersisterV3(); + } + return theInstance; + } + + @Override + public byte getID() { + return ID; + } + + public AMQPMessagePersisterV3() { + super(); + } + + + @Override + public int getEncodeSize(Message record) { + int encodeSize = super.getEncodeSize(record) + + DataConstants.SIZE_LONG; // expiration + return encodeSize; + } + + + /** Sub classes must add the first short as the protocol-id */ + @Override + public void encode(ActiveMQBuffer buffer, Message record) { + super.encode(buffer, record); + + buffer.writeLong(record.getExpiration()); + } + + @Override + public Message decode(ActiveMQBuffer buffer, Message ignore, CoreMessageObjectPools pool) { + Message record = super.decode(buffer, ignore, pool); + + assert record != null && AMQPStandardMessage.class.equals(record.getClass()); + + ((AMQPStandardMessage)record).reloadExpiration(buffer.readLong()); + + return record; + } + +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java index 93601389f1..d5d42d8d9c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java @@ -172,7 +172,7 @@ public class AMQPStandardMessage extends AMQPMessage { @Override public Persister getPersister() { - return AMQPMessagePersisterV2.getInstance(); + return AMQPMessagePersisterV3.getInstance(); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java index ba15817884..e0b6a2b9e1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java @@ -44,7 +44,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory @Override public Persister[] getPersister() { - Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance(), AMQPLargeMessagePersister.getInstance()}; + Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance(), AMQPLargeMessagePersister.getInstance(), AMQPMessagePersisterV3.getInstance()}; return persisters; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 57e7d9be46..fbef2fec20 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -1559,7 +1559,7 @@ public class AMQPMessageTest { ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024); try { decoded.getPersister().encode(buffer, decoded); - Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister + Assert.assertEquals(AMQPMessagePersisterV3.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister AMQPStandardMessage readMessage = (AMQPStandardMessage)decoded.getPersister().decode(buffer, null, null); Assert.assertEquals(33, readMessage.getMessageID()); Assert.assertEquals("someAddress", readMessage.getAddress()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPPersisterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPPersisterTest.java index cfb2b4f5b6..aa630b15b0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPPersisterTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPPersisterTest.java @@ -93,7 +93,7 @@ public class AMQPPersisterTest { Message message = createMessage(SimpleString.toSimpleString("Test"), 1, new byte[10]); - MessagePersister persister = AMQPMessagePersisterV2.getInstance(); + MessagePersister persister = AMQPMessagePersisterV3.getInstance(); ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024); persister.encode(buffer, message); diff --git a/tests/compatibility-tests/pom.xml b/tests/compatibility-tests/pom.xml index c63d89c3f6..e3fb14460c 100644 --- a/tests/compatibility-tests/pom.xml +++ b/tests/compatibility-tests/pom.xml @@ -423,6 +423,7 @@ org.apache.activemq:artemis-hornetq-protocol:2.4.0 org.apache.activemq:artemis-amqp-protocol:2.4.0 org.apache.activemq:artemis-hornetq-protocol:2.4.0 + org.apache.qpid:qpid-jms-client:0.26.0 org.codehaus.groovy:groovy-all:pom:${groovy.version} @@ -447,6 +448,7 @@ org.apache.activemq:artemis-hornetq-protocol:2.1.0 org.apache.activemq:artemis-amqp-protocol:2.1.0 org.apache.activemq:artemis-hornetq-protocol:2.1.0 + org.apache.qpid:qpid-jms-client:0.22.0 org.codehaus.groovy:groovy-all:pom:${groovy.version} @@ -471,6 +473,7 @@ org.apache.activemq:artemis-hornetq-protocol:2.0.0 org.apache.activemq:artemis-amqp-protocol:2.0.0 org.apache.activemq:artemis-hornetq-protocol:2.0.0 + org.apache.qpid:qpid-jms-client:0.20.0 org.codehaus.groovy:groovy-all:pom:${groovy.version} diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisClientAMQP.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisClientAMQP.groovy new file mode 100644 index 0000000000..5cf63c135d --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/clients/artemisClientAMQP.groovy @@ -0,0 +1,39 @@ +package clients +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Create a client connection factory + +// This differs from artemisClient.groovy as you can possibly use AMQP as the producer + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.compatibility.GroovyRun; + +if (serverArg[0].startsWith("HORNETQ")) { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false&reconnectAttempts=-1&retryInterval=100"); +} else { + if ("AMQP".equals(serverArg[1])) { + cf = new org.apache.qpid.jms.JmsConnectionFactory("amqp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100"); + } else { + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100"); + } +} + +if (!"AMQP".equals(serverArg[1])) { + GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend()); + GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize()); +} diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy index 100a6e9dc6..6a917b78bb 100644 --- a/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy +++ b/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy @@ -24,18 +24,19 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnection // Create a client connection factory -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory import org.apache.activemq.artemis.tests.compatibility.GroovyRun import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit; -CountDownLatch latch = new CountDownLatch(1); -((ActiveMQConnection)connectionToFail).setFailoverListener(new FailoverEventListener() { - @Override - void failoverEvent(FailoverEventType eventType) { - latch.countDown(); - } -}) -((ActiveMQConnection)connectionToFail).getSessionFactory().getConnection().fail(new ActiveMQException("fail")); -GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS)); +if (ActiveMQConnection.class.equals(connectionToFail.getClass())) { + CountDownLatch latch = new CountDownLatch(1); + ((ActiveMQConnection)connectionToFail).setFailoverListener(new FailoverEventListener() { + @Override + void failoverEvent(FailoverEventType eventType) { + latch.countDown(); + } + }) + ((ActiveMQConnection)connectionToFail).getSessionFactory().getConnection().fail(new ActiveMQException("fail")); + GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS)); +} diff --git a/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy index 87e8027cc1..054c925ef7 100644 --- a/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy +++ b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy @@ -25,6 +25,11 @@ import javax.jms.* String serverType = arg[0]; String clientType = arg[1]; String operation = arg[2]; +String protocol = null; + +if (arg.length > 3) { + protocol = arg[3]; +} try { legacyOption = legacy; @@ -61,7 +66,11 @@ String textBody = "a rapadura e doce mas nao e mole nao"; if (clientType.startsWith("ARTEMIS")) { // Can't depend directly on artemis, otherwise it wouldn't compile in hornetq - GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType); + if (protocol != null && protocol.equals("AMQP")) { + GroovyRun.evaluate("clients/artemisClientAMQP.groovy", "serverArg", serverType, protocol); + } else { + GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType); + } } else { // Can't depend directly on hornetq, otherwise it wouldn't compile in artemis GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg"); @@ -96,14 +105,32 @@ if (operation.equals("sendAckMessages") || operation.equals("sendTopic")) { producer.send(bytesMessage); - for (int i = 0; i < 10; i++) { - BytesMessage m = session.createBytesMessage(); - m.setIntProperty("count", i); - m.setIntProperty("order", 2 + i) + if ("AMQP".equals(protocol)) { + byte[] payload = new byte[LARGE_MESSAGE_SIZE]; - m.setObjectProperty(propertyLargeMessage, createFakeLargeStream(LARGE_MESSAGE_SIZE)); + InputStream inputStream = createFakeLargeStream(LARGE_MESSAGE_SIZE); + inputStream.read(payload); + inputStream.close(); - producer.send(m); + for (int i = 0; i < 10; i++) { + BytesMessage m = session.createBytesMessage(); + m.setIntProperty("count", i); + m.setIntProperty("order", 2 + i) + + m.writeBytes(payload); + + producer.send(m); + } + } else { + for (int i = 0; i < 10; i++) { + BytesMessage m = session.createBytesMessage(); + m.setIntProperty("count", i); + m.setIntProperty("order", 2 + i) + + m.setObjectProperty(propertyLargeMessage, createFakeLargeStream(LARGE_MESSAGE_SIZE)); + + producer.send(m); + } } ObjectMessage objMessage = session.createObjectMessage("rapadura"); @@ -175,7 +202,7 @@ if (operation.equals("sendAckMessages") || operation.equals("sendTopic")) { textMessage.setStringProperty("inMessageId", variableSize.toString()); newProducer.send(textMessage); newSession.commit(); - } + } newSession.commit(); newSession.close(); diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java index a61bd3d9ea..9f9e02d291 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java @@ -127,6 +127,23 @@ public class JournalCompatibilityTest extends VersionedBase { evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); } + @Test + public void testSendReceiveAMQPPaging() throws Throwable { + setVariable(senderClassloader, "persistent", true); + startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true); + evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy"); + evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages", "AMQP"); + evaluate(senderClassloader, "journalcompatibility/ispaging.groovy"); + stopServer(senderClassloader); + + setVariable(receiverClassloader, "persistent", true); + startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false); + evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy"); + + setVariable(receiverClassloader, "latch", null); + evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages", "AMQP"); + } + /** * Test that the server starts properly using an old journal even though persistent size * metrics were not originaly stored