ARTEMIS-2669 not durable AMQP messages cannot became durable on depaging
This commit is contained in:
parent
f94a0113c3
commit
085a74cf06
|
@ -836,15 +836,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
scanMessageData();
|
scanMessageData();
|
||||||
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
|
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
|
||||||
modified = false;
|
modified = false;
|
||||||
|
|
||||||
// Message state should reflect that is came from persistent storage which
|
|
||||||
// can happen when moved to a durable location. We must re-encode here to
|
|
||||||
// avoid a subsequent redelivery from suddenly appearing with a durable header
|
|
||||||
// tag when the initial delivery did not.
|
|
||||||
if (!isDurable()) {
|
|
||||||
setDurable(true);
|
|
||||||
reencode();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -16,9 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.amqp.paging;
|
package org.apache.activemq.artemis.tests.integration.amqp.paging;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
|
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
|
||||||
|
@ -30,9 +33,23 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class AmqpPagingTest extends AmqpClientTestSupport {
|
public class AmqpPagingTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "durability={0}")
|
||||||
|
public static Collection getParams() {
|
||||||
|
return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}, {null}});
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Boolean durable;
|
||||||
|
|
||||||
|
public AmqpPagingTest(Boolean durable) {
|
||||||
|
this.durable = durable;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void addConfiguration(ActiveMQServer server) {
|
protected void addConfiguration(ActiveMQServer server) {
|
||||||
super.addConfiguration(server);
|
super.addConfiguration(server);
|
||||||
|
@ -64,13 +81,18 @@ public class AmqpPagingTest extends AmqpClientTestSupport {
|
||||||
for (int i = 0; i < MSG_COUNT; i++) {
|
for (int i = 0; i < MSG_COUNT; i++) {
|
||||||
AmqpMessage message = new AmqpMessage();
|
AmqpMessage message = new AmqpMessage();
|
||||||
message.setText(data);
|
message.setText(data);
|
||||||
|
if (durable != null) {
|
||||||
|
message.setDurable(durable);
|
||||||
|
}
|
||||||
sender.send(message);
|
sender.send(message);
|
||||||
}
|
}
|
||||||
|
Assert.assertTrue(server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName())).isPaging());
|
||||||
sender.close();
|
sender.close();
|
||||||
receiver.flow(MSG_COUNT);
|
receiver.flow(MSG_COUNT);
|
||||||
for (int i = 0; i < MSG_COUNT; i++) {
|
for (int i = 0; i < MSG_COUNT; i++) {
|
||||||
AmqpMessage receive = receiver.receive(10, TimeUnit.SECONDS);
|
AmqpMessage receive = receiver.receive(10, TimeUnit.SECONDS);
|
||||||
assertNotNull("Not received anything after " + i + " receive", receive);
|
assertNotNull("Not received anything after " + i + " receive", receive);
|
||||||
|
Assert.assertEquals(durable == null ? false : durable.booleanValue(), receive.isDurable());
|
||||||
receive.accept();
|
receive.accept();
|
||||||
}
|
}
|
||||||
receiver.close();
|
receiver.close();
|
||||||
|
|
Loading…
Reference in New Issue