This commit is contained in:
Clebert Suconic 2019-01-17 15:39:31 -05:00
commit 4695bfb34a
6 changed files with 168 additions and 30 deletions

View File

@ -964,6 +964,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// computing the delete should be done after compacting is done
if (record == null) {
compactor.addCommandDelete(id, usedFile);
// JournalImplTestUni::testDoubleDelete was written to validate this condition:
if (compactor == null) {
logger.debug("Record " + id + " had been deleted already from a different call");
} else {
compactor.addCommandDelete(id, usedFile);
}
} else {
record.delete(usedFile);
}
@ -974,6 +980,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} catch (Throwable e) {
result.fail(e);
logger.error("appendDeleteRecord:" + e, e);
setErrorCondition(callback, null, e);
} finally {
journalLock.readLock().unlock();
}

View File

@ -1654,12 +1654,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
// Package protected ---------------------------------------------
protected void confirmLargeMessage(final LargeServerMessage largeServerMessage) {
if (largeServerMessage.getPendingRecordID() >= 0) {
try {
confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
synchronized (largeServerMessage) {
if (largeServerMessage.getPendingRecordID() >= 0) {
try {
confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
}
}
}

View File

@ -454,15 +454,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// This should be accessed from this package only
void deleteLargeMessageFile(final LargeServerMessage largeServerMessage) throws ActiveMQException {
if (largeServerMessage.getPendingRecordID() < 0) {
try {
// The delete file happens asynchronously
// And the client won't be waiting for the actual file to be deleted.
// We set a temporary record (short lived) on the journal
// to avoid a situation where the server is restarted and pending large message stays on forever
largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID()));
} catch (Exception e) {
throw new ActiveMQInternalErrorException(e.getMessage(), e);
synchronized (largeServerMessage) {
if (largeServerMessage.getPendingRecordID() < 0) {
try {
// The delete file happens asynchronously
// And the client won't be waiting for the actual file to be deleted.
// We set a temporary record (short lived) on the journal
// to avoid a situation where the server is restarted and pending large message stays on forever
largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID()));
} catch (Exception e) {
throw new ActiveMQInternalErrorException(e.getMessage(), e);
}
}
}
final SequentialFile file = largeServerMessage.getFile();

View File

@ -1364,13 +1364,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
*/
private void confirmLargeMessageSend(Transaction tx, final Message message) throws Exception {
LargeServerMessage largeServerMessage = (LargeServerMessage) message;
if (largeServerMessage.getPendingRecordID() >= 0) {
if (tx == null) {
storageManager.confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
} else {
storageManager.confirmPendingLargeMessageTX(tx, largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID());
synchronized (largeServerMessage) {
if (largeServerMessage.getPendingRecordID() >= 0) {
if (tx == null) {
storageManager.confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
} else {
storageManager.confirmPendingLargeMessageTX(tx, largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID());
}
largeServerMessage.setPendingRecordID(-1);
}
largeServerMessage.setPendingRecordID(-1);
}
}

View File

@ -23,9 +23,15 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.Map;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -36,6 +42,7 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
}
public SimpleString lmAddress = new SimpleString("LargeMessageAddress");
public SimpleString lmDropAddress = new SimpleString("LargeMessageDropAddress");
@Override
@Before
@ -43,6 +50,7 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
this.realStore = true;
super.setUp();
server.createQueue(lmAddress, RoutingType.ANYCAST, lmAddress, null, true, false);
server.createQueue(lmDropAddress, RoutingType.ANYCAST, lmDropAddress, null, true, false);
}
@Test
@ -62,6 +70,18 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
}
}
@Override
protected void configureAddressSettings(Map<String, AddressSettings> addressSettingsMap) {
addressSettingsMap.put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true));
addressSettingsMap.put(lmDropAddress.toString(),
new AddressSettings()
.setMaxSizeBytes(15 * 1024 * 1024)
.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP)
.setMessageCounterHistoryDayLimit(10)
.setRedeliveryDelay(0)
.setMaxDeliveryAttempts(0));
}
@Test
public void testSendReceiveLargeMessage() throws Exception {
// Create 1MB Message
@ -103,4 +123,53 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
assertArrayEquals(body, bytes);
}
}
@Test
public void testFastLargeMessageProducerDropOnPaging() throws Exception {
AssertionLoggerHandler.startCapture();
try {
// Create 100K Message
int size = 100 * 1024;
final byte[] bytes = new byte[size];
try (Connection connection = factory.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue(lmDropAddress.toString());
try (MessageProducer producer = session.createProducer(queue)) {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
bytes[0] = 1;
BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
final PagingStore pageStore = server.getPagingManager().getPageStore(lmDropAddress);
while (!pageStore.isPaging()) {
producer.send(message);
}
for (int i = 0; i < 10; i++) {
producer.send(message);
}
final long messageCount = server.locateQueue(lmDropAddress).getMessageCount();
Assert.assertTrue("The queue cannot be empty", messageCount > 0);
try (MessageConsumer messageConsumer = session.createConsumer(queue)) {
for (long m = 0; m < messageCount; m++) {
if (messageConsumer.receive(2000) == null) {
Assert.fail("The messages are not finished yet");
}
}
}
}
}
}
server.stop();
Assert.assertFalse(AssertionLoggerHandler.findText("NullPointerException"));
Assert.assertFalse(AssertionLoggerHandler.findText("It was not possible to delete message"));
} finally {
AssertionLoggerHandler.stopCapture();
}
}
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@ -27,6 +29,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.apache.activemq.artemis.utils.RandomUtil;
@ -437,7 +440,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
/**
* Use: calculateNumberOfFiles (fileSize, numberOfRecords, recordSize, numberOfRecords2, recordSize2, , ...., numberOfRecordsN, recordSizeN);
*/
private int calculateNumberOfFiles(TestableJournal journal, final int fileSize, final int alignment, final int... record) throws Exception {
private int calculateNumberOfFiles(TestableJournal journal,
final int fileSize,
final int alignment,
final int... record) throws Exception {
if (journal != null) {
journal.flush();
}
@ -1413,8 +1419,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testCommitRecordsInFileNoReclaim() throws Exception {
setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) +
512, true);
setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true);
createJournal();
startJournal();
load();
@ -1497,8 +1502,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testRollbackRecordsInFileNoReclaim() throws Exception {
setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) +
512, true);
setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true);
createJournal();
startJournal();
load();
@ -1589,8 +1593,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testEmptyPrepare() throws Exception {
setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) +
512, true);
setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true);
createJournal();
startJournal();
load();
@ -1624,8 +1627,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testPrepareNoReclaim() throws Exception {
setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) +
512, true);
setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true);
createJournal();
startJournal();
load();
@ -1998,6 +2000,60 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
loadAndCheck();
}
@Test
public void testDoubleDelete() throws Exception {
AssertionLoggerHandler.startCapture();
try {
setup(10, 10 * 1024, true);
createJournal();
startJournal();
load();
byte[] record = generateRecord(100);
add(1);
// I'm not adding that to the test assertion, as it will be deleted anyway.
// the test assertion doesn't support multi-thread, so I'm calling the journal directly here
journal.appendAddRecord(2, (byte) 0, record, sync);
Thread[] threads = new Thread[100];
CountDownLatch alignLatch = new CountDownLatch(threads.length);
CountDownLatch startFlag = new CountDownLatch(1);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
alignLatch.countDown();
try {
startFlag.await(5, TimeUnit.SECONDS);
journal.appendDeleteRecord(2, false);
} catch (java.lang.IllegalStateException expected) {
} catch (Exception e) {
e.printStackTrace();
}
});
threads[i].start();
}
Assert.assertTrue(alignLatch.await(5, TimeUnit.SECONDS));
startFlag.countDown();
for (Thread t : threads) {
t.join(TimeUnit.SECONDS.toMillis(10));
Assert.assertFalse(t.isAlive());
}
journal.flush();
Assert.assertFalse(AssertionLoggerHandler.findText("NullPointerException"));
stopJournal();
createJournal();
startJournal();
loadAndCheck();
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@Test
public void testMultipleAddUpdateAll() throws Exception {
setup(10, 10 * 1024, true);