ARTEMIS-2207 Page Showing Log.warns for regular acked messages

This commit is contained in:
Clebert Suconic 2018-12-18 18:17:03 -05:00 committed by Justin Bertram
parent 969983cf1b
commit 40966c769a
2 changed files with 111 additions and 32 deletions

View File

@ -1284,6 +1284,10 @@ public final class PageSubscriptionImpl implements PageSubscription {
continue;
}
if (info != null && info.isAck(message.getPosition())) {
continue;
}
// 2nd ... if TX, is it committed?
if (valid && message.getPagedMessage().getTransactionID() >= 0) {
PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage().getTransactionID());

View File

@ -71,6 +71,8 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
@ -277,6 +279,96 @@ public class PagingTest extends ActiveMQTestBase {
}
}
@Test
public void testPageTX() throws Exception {
AssertionLoggerHandler.startCapture();
try {
Configuration config = createDefaultInVMConfig();
final int PAGE_MAX = 20 * 1024;
final int PAGE_SIZE = 10 * 1024;
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.start();
final int numberOfBytes = 1024;
locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true);
server.getPagingManager().getPageStore(ADDRESS).forceAnotherPage();
server.getPagingManager().getPageStore(ADDRESS).disableCleanup();
session.start();
ClientProducer producer = session.createProducer(ADDRESS);
ClientConsumer browserConsumer = session.createConsumer(ADDRESS.concat("-0"), true);
ClientMessage message = null;
for (int i = 0; i < 201; i++) {
message = session.createMessage(true);
message.getBodyBuffer().writerIndex(0);
message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
for (int j = 1; j <= numberOfBytes; j++) {
message.getBodyBuffer().writeInt(j);
}
producer.send(message);
session.commit();
}
ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-0"));
session.start();
for (int i = 0; i < 201; i++) {
ClientMessage message2 = consumer.receive(10000);
Assert.assertNotNull(message2);
message2.acknowledge();
Assert.assertNotNull(message2);
session.commit();
}
consumer.close();
Queue queue = server.locateQueue(ADDRESS.concat("-0"));
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
PageCursorProvider provider = store.getCursorProvider();
PageSubscription cursorSubscription = provider.getSubscription(queue.getID());
PageIterator iterator = (PageIterator) cursorSubscription.iterator();
for (int i = 0; i < 5; i++) {
Assert.assertFalse(iterator.hasNext());
Assert.assertNull(browserConsumer.receiveImmediate());
}
session.close();
Assert.assertFalse(AssertionLoggerHandler.findText("Could not locate page"));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222029"));
server.getPagingManager().getPageStore(ADDRESS).enableCleanup();
Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging);
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@Test
public void testPageCleanup() throws Exception {
clearDataRecreateServerDirs();
@ -396,7 +488,6 @@ public class PagingTest extends ActiveMQTestBase {
System.out.println("pgComplete = " + pgComplete);
}
@Test
public void testPurge() throws Exception {
clearDataRecreateServerDirs();
@ -410,7 +501,7 @@ public class PagingTest extends ActiveMQTestBase {
String queue = "purgeQueue";
SimpleString ssQueue = new SimpleString(queue);
server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
QueueImpl purgeQueue = (QueueImpl)server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
QueueImpl purgeQueue = (QueueImpl) server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
@ -465,7 +556,6 @@ public class PagingTest extends ActiveMQTestBase {
connection.start();
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(50000);
Assert.assertNotNull(consumer.receive(5000));
session.commit();
@ -478,7 +568,6 @@ public class PagingTest extends ActiveMQTestBase {
StorageManager sm = server.getStorageManager();
for (int i = 0; i < 1000; i++) {
long tx = sm.generateID();
PageTransactionInfoImpl txinfo = new PageTransactionInfoImpl(tx);
@ -494,7 +583,6 @@ public class PagingTest extends ActiveMQTestBase {
Assert.assertEquals(0, server.getPagingManager().getTransactions().size());
}
// First page is complete but it wasn't deleted
@Test
public void testFirstPageCompleteNotDeleted() throws Exception {
@ -1741,17 +1829,13 @@ public class PagingTest extends ActiveMQTestBase {
for (int i = 0; i < 4; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("Before restart - message " + i + " is empty.",message);
Assert.assertNotNull("Before restart - message " + i + " is empty.", message);
message.acknowledge();
}
server.stop();
mainCleanup.set(false);
// Deleting the paging data. Simulating a failure
// a dumb user, or anything that will remove the data
deleteDirectory(new File(getPageDir()));
@ -1772,7 +1856,6 @@ public class PagingTest extends ActiveMQTestBase {
bodyLocal.writeBytes(new byte[MESSAGE_SIZE]);
producer.send(message);
}
session.commit();
@ -1788,7 +1871,7 @@ public class PagingTest extends ActiveMQTestBase {
for (int i = 0; i < 4; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("After restart - message " + i + " is empty.",message);
Assert.assertNotNull("After restart - message " + i + " is empty.", message);
message.acknowledge();
}
@ -1852,7 +1935,6 @@ public class PagingTest extends ActiveMQTestBase {
session.commit();
session.close();
ArrayList<RecordInfo> records = new ArrayList<>();
List<PreparedTransactionInfo> list = new ArrayList<>();
@ -1865,9 +1947,7 @@ public class PagingTest extends ActiveMQTestBase {
// Delete everything from the journal
for (RecordInfo info : records) {
if (!info.isUpdate && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE &&
info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_INC &&
info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COMPLETE) {
if (!info.isUpdate && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_INC && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COMPLETE) {
jrn.appendDeleteRecord(info.id, false);
}
}
@ -3502,8 +3582,6 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
// Thread.sleep(5000);
}
@Test
@ -4775,9 +4853,7 @@ public class PagingTest extends ActiveMQTestBase {
} catch (Throwable e) {
log.info("output bytes = " + bytesOutput);
log.info(threadDump("dump"));
fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") +
" with messageID=" +
message.getMessageID());
fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID());
}
}
@ -6452,7 +6528,6 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
}
@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);