This commit is contained in:
Clebert Suconic 2020-03-22 17:46:37 -04:00
commit f44803deaa
2 changed files with 22 additions and 9 deletions

View File

@ -836,15 +836,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
scanMessageData();
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
modified = false;
// Message state should reflect that is came from persistent storage which
// can happen when moved to a durable location. We must re-encode here to
// avoid a subsequent redelivery from suddenly appearing with a durable header
// tag when the initial delivery did not.
if (!isDurable()) {
setDurable(true);
reencode();
}
}
@Override

View File

@ -16,9 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp.paging;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
@ -30,9 +33,23 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class AmqpPagingTest extends AmqpClientTestSupport {
@Parameterized.Parameters(name = "durability={0}")
public static Collection getParams() {
return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}, {null}});
}
private final Boolean durable;
public AmqpPagingTest(Boolean durable) {
this.durable = durable;
}
@Override
protected void addConfiguration(ActiveMQServer server) {
super.addConfiguration(server);
@ -64,13 +81,18 @@ public class AmqpPagingTest extends AmqpClientTestSupport {
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
message.setText(data);
if (durable != null) {
message.setDurable(durable);
}
sender.send(message);
}
Assert.assertTrue(server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName())).isPaging());
sender.close();
receiver.flow(MSG_COUNT);
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage receive = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Not received anything after " + i + " receive", receive);
Assert.assertEquals(durable == null ? false : durable.booleanValue(), receive.isDurable());
receive.accept();
}
receiver.close();