From f6ef285859347a57749ac4e5881d879df23e4f88 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 22 Oct 2020 13:32:19 -0400 Subject: [PATCH] ARTEMIS-2927 LVQ broken after restart --- .../core/server/impl/LastValueQueue.java | 18 ++++++++++ .../tests/integration/server/LVQTest.java | 34 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 3cb3d09d0e..a93d27ceb3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -285,6 +285,19 @@ public class LastValueQueue extends QueueImpl { super.acknowledge(tx, ref, reason, consumer); } + @Override + public synchronized void reload(final MessageReference ref) { + // repopulate LVQ map & reload proper HolderReferences + SimpleString lastValueProp = ref.getLastValueProperty(); + if (lastValueProp != null) { + HolderReference hr = new HolderReference(lastValueProp, ref); + map.put(lastValueProp, hr); + super.reload(hr); + } else { + super.reload(ref); + } + } + private synchronized void removeIfCurrent(MessageReference ref) { SimpleString lastValueProp = ref.getLastValueProperty(); if (lastValueProp != null) { @@ -539,6 +552,11 @@ public class LastValueQueue extends QueueImpl { return ref.getPersistentSize(); } + @Override + public String toString() { + return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString(); + } + @Override public PagingStore getOwner() { return ref.getOwner(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java index d35a96b2a2..b040f6bc7c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java @@ -76,6 +76,40 @@ public class LVQTest extends ActiveMQTestBase { Assert.assertEquals(m.getBodyBuffer().readString(), "m2"); } + @Test + public void testSimpleRestart() throws Exception { + ClientProducer producer = clientSession.createProducer(address); + ClientMessage m1 = createTextMessage(clientSession, "m1"); + SimpleString rh = new SimpleString("SMID1"); + m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); + producer.send(m1); + ClientMessage m2 = createTextMessage(clientSession, "m2"); + m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); + producer.send(m2); + assertEquals(1, server.locateQueue(qName1).getMessageCount()); + clientSession.close(); + + server.stop(); + server.start(); + + assertEquals(1, server.locateQueue(qName1).getMessageCount()); + ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0); + ClientSessionFactory sf = createSessionFactory(locator); + clientSession = addClientSession(sf.createSession(false, true, true)); + producer = clientSession.createProducer(address); + ClientMessage m3 = createTextMessage(clientSession, "m3"); + m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); + producer.send(m3); + assertEquals(1, server.locateQueue(qName1).getMessageCount()); + + ClientConsumer consumer = clientSession.createConsumer(qName1); + clientSession.start(); + ClientMessage m = consumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals("m3", m.getBodyBuffer().readString()); + } + @Test public void testMultipleMessages() throws Exception { ClientProducer producer = clientSession.createProducer(address);