This commit is contained in:
Clebert Suconic 2020-10-22 13:41:19 -04:00
commit aa975ad47c
2 changed files with 52 additions and 0 deletions

View File

@ -285,6 +285,19 @@ public class LastValueQueue extends QueueImpl {
super.acknowledge(tx, ref, reason, consumer); super.acknowledge(tx, ref, reason, consumer);
} }
@Override
public synchronized void reload(final MessageReference ref) {
// repopulate LVQ map & reload proper HolderReferences
SimpleString lastValueProp = ref.getLastValueProperty();
if (lastValueProp != null) {
HolderReference hr = new HolderReference(lastValueProp, ref);
map.put(lastValueProp, hr);
super.reload(hr);
} else {
super.reload(ref);
}
}
private synchronized void removeIfCurrent(MessageReference ref) { private synchronized void removeIfCurrent(MessageReference ref) {
SimpleString lastValueProp = ref.getLastValueProperty(); SimpleString lastValueProp = ref.getLastValueProperty();
if (lastValueProp != null) { if (lastValueProp != null) {
@ -539,6 +552,11 @@ public class LastValueQueue extends QueueImpl {
return ref.getPersistentSize(); return ref.getPersistentSize();
} }
@Override
public String toString() {
return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
}
@Override @Override
public PagingStore getOwner() { public PagingStore getOwner() {
return ref.getOwner(); return ref.getOwner();

View File

@ -76,6 +76,40 @@ public class LVQTest extends ActiveMQTestBase {
Assert.assertEquals(m.getBodyBuffer().readString(), "m2"); Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
} }
@Test
public void testSimpleRestart() throws Exception {
ClientProducer producer = clientSession.createProducer(address);
ClientMessage m1 = createTextMessage(clientSession, "m1");
SimpleString rh = new SimpleString("SMID1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
producer.send(m1);
ClientMessage m2 = createTextMessage(clientSession, "m2");
m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
producer.send(m2);
assertEquals(1, server.locateQueue(qName1).getMessageCount());
clientSession.close();
server.stop();
server.start();
assertEquals(1, server.locateQueue(qName1).getMessageCount());
ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
ClientSessionFactory sf = createSessionFactory(locator);
clientSession = addClientSession(sf.createSession(false, true, true));
producer = clientSession.createProducer(address);
ClientMessage m3 = createTextMessage(clientSession, "m3");
m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
producer.send(m3);
assertEquals(1, server.locateQueue(qName1).getMessageCount());
ClientConsumer consumer = clientSession.createConsumer(qName1);
clientSession.start();
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals("m3", m.getBodyBuffer().readString());
}
@Test @Test
public void testMultipleMessages() throws Exception { public void testMultipleMessages() throws Exception {
ClientProducer producer = clientSession.createProducer(address); ClientProducer producer = clientSession.createProducer(address);