ARTEMIS-2092 fix first page messages lost on server crash bug

This commit is contained in:
yang wei 2018-09-19 17:42:02 +08:00 committed by Clebert Suconic
parent 94bd8ed77e
commit d868563e7b
2 changed files with 148 additions and 1 deletions

View File

@ -1155,7 +1155,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
pendingCountEncoding.decode(buff);
pendingCountEncoding.setID(record.id);
PageSubscription sub = locateSubscription(pendingCountEncoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.notEmpty();
}
// This can be null on testcases not interested on this outcome
if (pendingNonTXPageCounter != null) {
pendingNonTXPageCounter.add(pendingCountEncoding);

View File

@ -37,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -62,16 +63,20 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
@ -6076,6 +6081,145 @@ public class PagingTest extends ActiveMQTestBase {
}
@Test
public void testOnlyOnePageOnServerCrash() throws Throwable {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
class NonStoppablePagingStoreImpl extends PagingStoreImpl {
NonStoppablePagingStoreImpl(SimpleString address,
ScheduledExecutorService scheduledExecutor,
long syncTimeout,
PagingManager pagingManager,
StorageManager storageManager,
SequentialFileFactory fileFactory,
PagingStoreFactory storeFactory,
SimpleString storeName,
AddressSettings addressSettings,
ArtemisExecutor executor,
boolean syncNonTransactional) {
super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, syncNonTransactional);
}
/**
* Normal stopping will cleanup non tx page subscription counter which will not trigger the bug.
* Here we override stop to simulate server crash.
* @throws Exception
*/
@Override
public synchronized void stop() throws Exception {
}
}
if (storeType == StoreConfiguration.StoreType.DATABASE) {
server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
@Override
protected PagingStoreFactoryDatabase getPagingStoreFactory() throws Exception {
return new PagingStoreFactoryDatabase((DatabaseStorageConfiguration) this.getConfiguration().getStoreConfiguration(), this.getStorageManager(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
@Override
public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) {
return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.syncNonTransactional);
}
};
}
};
} else {
server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
@Override
protected PagingStoreFactoryNIO getPagingStoreFactory() {
return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
@Override
public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) {
return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.isSyncNonTransactional());
}
};
}
};
}
addServer(server);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_SIZE + MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.start();
// Here we send some messages to ensure the queue start paging and create only one page
final int numberOfMessages = 12;
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, false);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
message.putIntProperty("count", i);
producer.send(message);
}
producer.close();
session.close();
Queue queue = server.locateQueue(PagingTest.ADDRESS);
assertEquals(numberOfMessages, getMessageCount(queue));
assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
sf.close();
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_SIZE + MESSAGE_SIZE);
server.start();
sf = createSessionFactory(locator);
session = sf.createSession(false, false, false);
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
for (int i = 0; i < numberOfMessages; i++) {
ClientMessage msg = consumer.receive(1000);
assertNotNull(i + "th msg is null", msg);
assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
System.out.println(msg);
}
assertNull(consumer.receiveImmediate());
session.commit();
session.close();
sf.close();
locator.close();
server.stop();
}
@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);