This commit is contained in:
Clebert Suconic 2020-12-22 10:39:41 -05:00
commit a0f464b816
12 changed files with 190 additions and 31 deletions

View File

@ -26,7 +26,7 @@ package org.apache.activemq.artemis.core.persistence;
public class PersisterIDs {
public static final int MAX_PERSISTERS = 4;
public static final int MAX_PERSISTERS = 5;
public static final byte CoreLargeMessagePersister_ID = (byte)0;
@ -38,4 +38,6 @@ public class PersisterIDs {
public static final byte AMQPLargeMessagePersister_ID = (byte)4;
public static final byte AMQPMessagePersisterV3_ID = (byte)5;
}

View File

@ -50,8 +50,7 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
@Override
public int getEncodeSize(Message record) {
int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT +
DataConstants.SIZE_LONG; // expiration
int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT;
TypedProperties properties = ((AMQPMessage)record).getExtraProperties();
@ -71,8 +70,6 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
buffer.writeInt(properties.getEncodeSize());
properties.encode(buffer.byteBuf());
}
buffer.writeLong(record.getExpiration());
}
@Override
@ -100,10 +97,6 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null);
}
record.reloadAddress(address);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
record.reloadExpiration(buffer.readLong());
}
return record;
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.utils.DataConstants;
import static org.apache.activemq.artemis.core.persistence.PersisterIDs.AMQPMessagePersisterV3_ID;
public class AMQPMessagePersisterV3 extends AMQPMessagePersisterV2 {
public static final byte ID = AMQPMessagePersisterV3_ID;
public static AMQPMessagePersisterV3 theInstance;
public static AMQPMessagePersisterV3 getInstance() {
if (theInstance == null) {
theInstance = new AMQPMessagePersisterV3();
}
return theInstance;
}
@Override
public byte getID() {
return ID;
}
public AMQPMessagePersisterV3() {
super();
}
@Override
public int getEncodeSize(Message record) {
int encodeSize = super.getEncodeSize(record) +
DataConstants.SIZE_LONG; // expiration
return encodeSize;
}
/** Sub classes must add the first short as the protocol-id */
@Override
public void encode(ActiveMQBuffer buffer, Message record) {
super.encode(buffer, record);
buffer.writeLong(record.getExpiration());
}
@Override
public Message decode(ActiveMQBuffer buffer, Message ignore, CoreMessageObjectPools pool) {
Message record = super.decode(buffer, ignore, pool);
assert record != null && AMQPStandardMessage.class.equals(record.getClass());
((AMQPStandardMessage)record).reloadExpiration(buffer.readLong());
return record;
}
}

View File

@ -172,7 +172,7 @@ public class AMQPStandardMessage extends AMQPMessage {
@Override
public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
return AMQPMessagePersisterV2.getInstance();
return AMQPMessagePersisterV3.getInstance();
}
@Override

View File

@ -44,7 +44,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
@Override
public Persister<Message>[] getPersister() {
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance(), AMQPLargeMessagePersister.getInstance()};
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance(), AMQPLargeMessagePersister.getInstance(), AMQPMessagePersisterV3.getInstance()};
return persisters;
}

View File

@ -1559,7 +1559,7 @@ public class AMQPMessageTest {
ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
try {
decoded.getPersister().encode(buffer, decoded);
Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
Assert.assertEquals(AMQPMessagePersisterV3.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
AMQPStandardMessage readMessage = (AMQPStandardMessage)decoded.getPersister().decode(buffer, null, null);
Assert.assertEquals(33, readMessage.getMessageID());
Assert.assertEquals("someAddress", readMessage.getAddress());

View File

@ -93,7 +93,7 @@ public class AMQPPersisterTest {
Message message = createMessage(SimpleString.toSimpleString("Test"), 1, new byte[10]);
MessagePersister persister = AMQPMessagePersisterV2.getInstance();
MessagePersister persister = AMQPMessagePersisterV3.getInstance();
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
persister.encode(buffer, message);

View File

@ -423,6 +423,7 @@
<arg>org.apache.activemq:artemis-hornetq-protocol:2.4.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.4.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.4.0</arg>
<arg>org.apache.qpid:qpid-jms-client:0.26.0</arg>
<arg>org.codehaus.groovy:groovy-all:pom:${groovy.version}</arg>
</libListWithDeps>
<libList>
@ -447,6 +448,7 @@
<arg>org.apache.activemq:artemis-hornetq-protocol:2.1.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.1.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.1.0</arg>
<arg>org.apache.qpid:qpid-jms-client:0.22.0</arg>
<arg>org.codehaus.groovy:groovy-all:pom:${groovy.version}</arg>
</libListWithDeps>
<libList>
@ -471,6 +473,7 @@
<arg>org.apache.activemq:artemis-hornetq-protocol:2.0.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.0.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.0.0</arg>
<arg>org.apache.qpid:qpid-jms-client:0.20.0</arg>
<arg>org.codehaus.groovy:groovy-all:pom:${groovy.version}</arg>
</libListWithDeps>
<libList>

View File

@ -0,0 +1,39 @@
package clients
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Create a client connection factory
// This differs from artemisClient.groovy as you can possibly use AMQP as the producer
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
if (serverArg[0].startsWith("HORNETQ")) {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false&reconnectAttempts=-1&retryInterval=100");
} else {
if ("AMQP".equals(serverArg[1])) {
cf = new org.apache.qpid.jms.JmsConnectionFactory("amqp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100");
} else {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100");
}
}
if (!"AMQP".equals(serverArg[1])) {
GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend());
GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize());
}

View File

@ -24,18 +24,19 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnection
// Create a client connection factory
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit;
CountDownLatch latch = new CountDownLatch(1);
((ActiveMQConnection)connectionToFail).setFailoverListener(new FailoverEventListener() {
@Override
void failoverEvent(FailoverEventType eventType) {
latch.countDown();
}
})
((ActiveMQConnection)connectionToFail).getSessionFactory().getConnection().fail(new ActiveMQException("fail"));
GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));
if (ActiveMQConnection.class.equals(connectionToFail.getClass())) {
CountDownLatch latch = new CountDownLatch(1);
((ActiveMQConnection)connectionToFail).setFailoverListener(new FailoverEventListener() {
@Override
void failoverEvent(FailoverEventType eventType) {
latch.countDown();
}
})
((ActiveMQConnection)connectionToFail).getSessionFactory().getConnection().fail(new ActiveMQException("fail"));
GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));
}

View File

@ -25,6 +25,11 @@ import javax.jms.*
String serverType = arg[0];
String clientType = arg[1];
String operation = arg[2];
String protocol = null;
if (arg.length > 3) {
protocol = arg[3];
}
try {
legacyOption = legacy;
@ -61,7 +66,11 @@ String textBody = "a rapadura e doce mas nao e mole nao";
if (clientType.startsWith("ARTEMIS")) {
// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq
GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
if (protocol != null && protocol.equals("AMQP")) {
GroovyRun.evaluate("clients/artemisClientAMQP.groovy", "serverArg", serverType, protocol);
} else {
GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
}
} else {
// Can't depend directly on hornetq, otherwise it wouldn't compile in artemis
GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg");
@ -96,14 +105,32 @@ if (operation.equals("sendAckMessages") || operation.equals("sendTopic")) {
producer.send(bytesMessage);
for (int i = 0; i < 10; i++) {
BytesMessage m = session.createBytesMessage();
m.setIntProperty("count", i);
m.setIntProperty("order", 2 + i)
if ("AMQP".equals(protocol)) {
byte[] payload = new byte[LARGE_MESSAGE_SIZE];
m.setObjectProperty(propertyLargeMessage, createFakeLargeStream(LARGE_MESSAGE_SIZE));
InputStream inputStream = createFakeLargeStream(LARGE_MESSAGE_SIZE);
inputStream.read(payload);
inputStream.close();
producer.send(m);
for (int i = 0; i < 10; i++) {
BytesMessage m = session.createBytesMessage();
m.setIntProperty("count", i);
m.setIntProperty("order", 2 + i)
m.writeBytes(payload);
producer.send(m);
}
} else {
for (int i = 0; i < 10; i++) {
BytesMessage m = session.createBytesMessage();
m.setIntProperty("count", i);
m.setIntProperty("order", 2 + i)
m.setObjectProperty(propertyLargeMessage, createFakeLargeStream(LARGE_MESSAGE_SIZE));
producer.send(m);
}
}
ObjectMessage objMessage = session.createObjectMessage("rapadura");
@ -175,7 +202,7 @@ if (operation.equals("sendAckMessages") || operation.equals("sendTopic")) {
textMessage.setStringProperty("inMessageId", variableSize.toString());
newProducer.send(textMessage);
newSession.commit();
}
}
newSession.commit();
newSession.close();

View File

@ -127,6 +127,23 @@ public class JournalCompatibilityTest extends VersionedBase {
evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
}
@Test
public void testSendReceiveAMQPPaging() throws Throwable {
setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", null, true);
evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy");
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages", "AMQP");
evaluate(senderClassloader, "journalcompatibility/ispaging.groovy");
stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", null, false);
evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy");
setVariable(receiverClassloader, "latch", null);
evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages", "AMQP");
}
/**
* Test that the server starts properly using an old journal even though persistent size
* metrics were not originaly stored