ARTEMIS-2927 LVQ broken after restart
This commit is contained in:
parent
eea95e4a5a
commit
f6ef285859
|
@ -285,6 +285,19 @@ public class LastValueQueue extends QueueImpl {
|
|||
super.acknowledge(tx, ref, reason, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reload(final MessageReference ref) {
|
||||
// repopulate LVQ map & reload proper HolderReferences
|
||||
SimpleString lastValueProp = ref.getLastValueProperty();
|
||||
if (lastValueProp != null) {
|
||||
HolderReference hr = new HolderReference(lastValueProp, ref);
|
||||
map.put(lastValueProp, hr);
|
||||
super.reload(hr);
|
||||
} else {
|
||||
super.reload(ref);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void removeIfCurrent(MessageReference ref) {
|
||||
SimpleString lastValueProp = ref.getLastValueProperty();
|
||||
if (lastValueProp != null) {
|
||||
|
@ -539,6 +552,11 @@ public class LastValueQueue extends QueueImpl {
|
|||
return ref.getPersistentSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PagingStore getOwner() {
|
||||
return ref.getOwner();
|
||||
|
|
|
@ -76,6 +76,40 @@ public class LVQTest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleRestart() throws Exception {
|
||||
ClientProducer producer = clientSession.createProducer(address);
|
||||
ClientMessage m1 = createTextMessage(clientSession, "m1");
|
||||
SimpleString rh = new SimpleString("SMID1");
|
||||
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
|
||||
producer.send(m1);
|
||||
ClientMessage m2 = createTextMessage(clientSession, "m2");
|
||||
m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
|
||||
producer.send(m2);
|
||||
assertEquals(1, server.locateQueue(qName1).getMessageCount());
|
||||
clientSession.close();
|
||||
|
||||
server.stop();
|
||||
server.start();
|
||||
|
||||
assertEquals(1, server.locateQueue(qName1).getMessageCount());
|
||||
ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
clientSession = addClientSession(sf.createSession(false, true, true));
|
||||
producer = clientSession.createProducer(address);
|
||||
ClientMessage m3 = createTextMessage(clientSession, "m3");
|
||||
m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
|
||||
producer.send(m3);
|
||||
assertEquals(1, server.locateQueue(qName1).getMessageCount());
|
||||
|
||||
ClientConsumer consumer = clientSession.createConsumer(qName1);
|
||||
clientSession.start();
|
||||
ClientMessage m = consumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
Assert.assertEquals("m3", m.getBodyBuffer().readString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleMessages() throws Exception {
|
||||
ClientProducer producer = clientSession.createProducer(address);
|
||||
|
|
Loading…
Reference in New Issue