ARTEMIS-2701 Improving DLQ/check over previously removed records

This commit is contained in:
Clebert Suconic 2020-04-02 17:38:12 -04:00
parent ae17fd6552
commit af796d5ce4
22 changed files with 581 additions and 60 deletions

View File

@ -501,6 +501,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
appendRecord(r);
}
@Override
public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
appendUpdateRecord(id, recordType, record, sync);
return true;
}
@Override
public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
@ -516,6 +522,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
appendRecord(r);
}
@Override
public boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
appendUpdateRecord(id, recordType, persister, record, sync);
return true;
}
@Override
public void appendUpdateRecord(long id,
byte recordType,
@ -539,6 +551,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
appendRecord(r);
}
@Override
public boolean tryAppendUpdateRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
appendUpdateRecord(id, recordType, persister, record, sync, completionCallback);
return true;
}
@Override
public void appendDeleteRecord(long id, boolean sync) throws Exception {
checkStatus();
@ -553,6 +578,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
appendRecord(r);
}
@Override
public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception {
appendDeleteRecord(id, sync);
return true;
}
@Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
checkStatus(completionCallback);
@ -569,6 +600,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
appendRecord(r);
}
@Override
public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
appendDeleteRecord(id, sync, completionCallback);
return true;
}
@Override
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
checkStatus();

View File

@ -85,12 +85,20 @@ public interface Journal extends ActiveMQComponent {
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
}
default boolean tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
}
void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
default void appendUpdateRecord(long id,
byte recordType,
EncodingSupport record,
@ -99,6 +107,14 @@ public interface Journal extends ActiveMQComponent {
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
}
default boolean tryAppendUpdateRecord(long id,
byte recordType,
EncodingSupport record,
boolean sync,
IOCompletion completionCallback) throws Exception {
return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
}
void appendUpdateRecord(long id,
byte recordType,
Persister persister,
@ -106,10 +122,21 @@ public interface Journal extends ActiveMQComponent {
boolean sync,
IOCompletion callback) throws Exception;
boolean tryAppendUpdateRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion callback) throws Exception;
void appendDeleteRecord(long id, boolean sync) throws Exception;
boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception;
void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
// Transactional operations
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;

View File

@ -172,6 +172,13 @@ public final class FileWrapperJournal extends JournalBase {
writeRecord(deleteRecord, false, -1, false, callback);
}
@Override
public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception {
appendDeleteRecord(id, sync, callback);
return true;
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
@ -199,6 +206,18 @@ public final class FileWrapperJournal extends JournalBase {
writeRecord(updateRecord, false, -1, false, callback);
}
@Override
public boolean tryAppendUpdateRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion callback) throws Exception {
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
writeRecord(updateRecord, false, -1, false, callback);
return true;
}
@Override
public void appendUpdateRecordTransactional(long txID,
long id,

View File

@ -77,6 +77,14 @@ abstract class JournalBase implements Journal {
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@Override
public boolean tryAppendUpdateRecord(final long id,
final byte recordType,
final byte[] record,
final boolean sync) throws Exception {
return tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@Override
public void appendUpdateRecordTransactional(final long txID,
final long id,
@ -136,6 +144,23 @@ abstract class JournalBase implements Journal {
}
}
@Override
public boolean tryAppendUpdateRecord(final long id,
final byte recordType,
final Persister persister,
final Object record,
final boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync);
boolean append = tryAppendUpdateRecord(id, recordType, persister, record, sync, callback);
if (callback != null) {
callback.waitCompletion();
}
return append;
}
@Override
public void appendRollbackRecord(final long txID, final boolean sync) throws Exception {
SyncIOCompletion syncCompletion = getSyncCallback(sync);
@ -159,6 +184,18 @@ abstract class JournalBase implements Journal {
}
}
@Override
public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync);
boolean result = tryAppendDeleteRecord(id, sync, callback);
if (callback != null) {
callback.waitCompletion();
}
return result;
}
abstract void scheduleReclaim();
protected SyncIOCompletion getSyncCallback(final boolean sync) {

View File

@ -884,7 +884,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
lineUpContext(callback);
checkKnownRecordID(id);
checkKnownRecordID(id, true);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendUpdateRecord::id=" + id +
@ -892,8 +892,47 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
recordType);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
internalAppendUpdateRecord(id, recordType, persister, record, sync, callback);
}
@Override
public boolean tryAppendUpdateRecord(final long id,
final byte recordType,
final Persister persister,
final Object record,
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
lineUpContext(callback);
if (!checkKnownRecordID(id, false)) {
if (callback != null) {
callback.done();
}
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendUpdateRecord::id=" + id +
", userRecordType=" +
recordType);
}
internalAppendUpdateRecord(id, recordType, persister, record, sync, callback);
return true;
}
private void internalAppendUpdateRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
@ -946,8 +985,37 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
checkJournalIsLoaded();
lineUpContext(callback);
checkKnownRecordID(id);
checkKnownRecordID(id, true);
internalAppendDeleteRecord(id, sync, callback);
return;
}
@Override
public boolean tryAppendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendDeleteRecord::id=" + id);
}
checkJournalIsLoaded();
lineUpContext(callback);
if (!checkKnownRecordID(id, false)) {
if (callback != null) {
callback.done();
}
return false;
}
internalAppendDeleteRecord(id, sync, callback);
return true;
}
private void internalAppendDeleteRecord(long id,
boolean sync,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
@ -1055,9 +1123,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
});
}
private void checkKnownRecordID(final long id) throws Exception {
private boolean checkKnownRecordID(final long id, boolean strict) throws Exception {
if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.containsRecord(id))) {
return;
return true;
}
final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
@ -1079,8 +1147,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
});
if (!known.get()) {
if (strict) {
throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
}
return false;
} else {
return true;
}
}
private void checkJournalIsLoaded() throws Exception {

View File

@ -1048,6 +1048,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void removeDestination(ActiveMQDestination dest) throws Exception {
if (dest.isQueue()) {
if (!dest.isTemporary()) {
// this should not really happen,
// so I'm not creating a Logger for this
logger.warn("OpenWire client sending a queue remove towards " + dest.getPhysicalName());
}
try {
server.destroyQueue(new SimpleString(dest.getPhysicalName()), getRemotingConnection());
} catch (ActiveMQNonExistentQueueException neq) {

View File

@ -194,15 +194,15 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void storeReference(long queueID, long messageID, boolean last) throws Exception;
void deleteMessage(long messageID) throws Exception;
boolean deleteMessage(long messageID) throws Exception;
void storeAcknowledge(long queueID, long messageID) throws Exception;
void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
void updateDeliveryCount(MessageReference ref) throws Exception;
boolean updateDeliveryCount(MessageReference ref) throws Exception;
void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception;
void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;

View File

@ -425,25 +425,25 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
@Override
public void deleteMessage(final long messageID) throws Exception {
public boolean deleteMessage(final long messageID) throws Exception {
readLock();
try {
// Messages are deleted on postACK, one after another.
// If these deletes are synchronized, we would build up messages on the Executor
// increasing chances of losing deletes.
// The StorageManager should verify messages without references
messageJournal.appendDeleteRecord(messageID, false, getContext(false));
return messageJournal.tryAppendDeleteRecord(messageID, false, getContext(false));
} finally {
readUnLock();
}
}
@Override
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
readLock();
try {
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional));
return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional));
} finally {
readUnLock();
}
@ -725,11 +725,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
// Other operations
@Override
public void updateDeliveryCount(final MessageReference ref) throws Exception {
public boolean updateDeliveryCount(final MessageReference ref) throws Exception {
// no need to store if it's the same value
// otherwise the journal will get OME in case of lots of redeliveries
if (ref.getDeliveryCount() == ref.getPersistedCount()) {
return;
return true;
}
ref.setPersistedCount(ref.getDeliveryCount());
@ -737,7 +737,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
readLock();
try {
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
} finally {
readUnLock();
}

View File

@ -226,7 +226,8 @@ public class NullStorageManager implements StorageManager {
}
@Override
public void deleteMessage(final long messageID) throws Exception {
public boolean deleteMessage(final long messageID) throws Exception {
return true;
}
@Override
@ -238,7 +239,8 @@ public class NullStorageManager implements StorageManager {
}
@Override
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
return true;
}
@Override
@ -250,7 +252,8 @@ public class NullStorageManager implements StorageManager {
}
@Override
public void updateDeliveryCount(final MessageReference ref) throws Exception {
public boolean updateDeliveryCount(final MessageReference ref) throws Exception {
return true;
}
@Override

View File

@ -207,6 +207,21 @@ public class ReplicatedJournal implements Journal {
localJournal.appendDeleteRecord(id, sync);
}
/**
* @param id
* @param sync
* @throws Exception
* @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecord(long, boolean)
*/
@Override
public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception {
if (log.isTraceEnabled()) {
log.trace("AppendDelete " + id);
}
replicationManager.appendDeleteRecord(journalID, id);
return localJournal.tryAppendDeleteRecord(id, sync);
}
@Override
public void appendDeleteRecord(final long id,
final boolean sync,
@ -218,6 +233,16 @@ public class ReplicatedJournal implements Journal {
localJournal.appendDeleteRecord(id, sync, completionCallback);
}
@Override
public boolean tryAppendDeleteRecord(final long id,
final boolean sync,
final IOCompletion completionCallback) throws Exception {
if (log.isTraceEnabled()) {
log.trace("AppendDelete " + id);
}
replicationManager.appendDeleteRecord(journalID, id);
return localJournal.tryAppendDeleteRecord(id, sync, completionCallback);
}
/**
* @param txID
* @param id
@ -345,6 +370,15 @@ public class ReplicatedJournal implements Journal {
this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@Override
public boolean tryAppendUpdateRecord(final long id,
final byte recordType,
final byte[] record,
final boolean sync) throws Exception {
return this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
/**
* @param id
* @param recordType
@ -366,6 +400,19 @@ public class ReplicatedJournal implements Journal {
localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
}
@Override
public boolean tryAppendUpdateRecord(final long id,
final byte recordType,
final Persister persister,
final Object record,
final boolean sync) throws Exception {
if (log.isTraceEnabled()) {
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
return localJournal.tryAppendUpdateRecord(id, recordType, persister, record, sync);
}
@Override
public void appendUpdateRecord(final long id,
final byte journalRecordType,
@ -380,6 +427,20 @@ public class ReplicatedJournal implements Journal {
localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
}
@Override
public boolean tryAppendUpdateRecord(final long id,
final byte journalRecordType,
final Persister persister,
final Object record,
final boolean sync,
final IOCompletion completionCallback) throws Exception {
if (log.isTraceEnabled()) {
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
return localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
}
/**
* @param txID
* @param id

View File

@ -3093,7 +3093,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
final long timeBase,
final boolean ignoreRedeliveryDelay) throws Exception {
Message message = reference.getMessage();
if (internalQueue) {
if (logger.isTraceEnabled()) {
@ -3104,7 +3103,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
storageManager.updateDeliveryCount(reference);
if (!storageManager.updateDeliveryCount(reference)) {
return new Pair<>(false, false);
}
}
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
@ -3739,7 +3740,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// as we can't delete each messaging with sync=true while adding messages transactionally.
// There is a startup check to remove non referenced messages case these deletes fail
try {
storageManager.deleteMessage(message.getMessageID());
if (!storageManager.deleteMessage(message.getMessageID())) {
ActiveMQServerLogger.LOGGER.errorRemovingMessage(new Exception(), message.getMessageID());
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID());
}

View File

@ -354,8 +354,8 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public void deleteMessage(long messageID) throws Exception {
public boolean deleteMessage(long messageID) throws Exception {
return true;
}
@Override
@ -369,13 +369,13 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public void updateDeliveryCount(MessageReference ref) throws Exception {
public boolean updateDeliveryCount(MessageReference ref) throws Exception {
return true;
}
@Override
public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception {
return true;
}
@Override

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ForceDeleteQueue extends ActiveMQTestBase {
ActiveMQServer server;
String protocol = "openwire";
String uri = "tcp://localhost:61616";
public ForceDeleteQueue(String protocol) {
this.protocol = protocol;
}
@Parameterized.Parameters(name = "protocol={0}")
public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{"openwire"}, {"core"}, {"amqp"}};
return Arrays.asList(params);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
if (protocol.equals("openwire")) {
uri = "tcp://localhost:61616?jms.prefetchPolicy.all=5000";
}
server = createServer(true, true);
server.getAddressSettingsRepository().addMatch("#",
new AddressSettings().setMaxDeliveryAttempts(2));
server.start();
}
@Test
public void testForceDelete() throws Exception {
SimpleString queueName = SimpleString.toSimpleString("testForceDelete");
server.addAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, uri);
Connection conn = factory.createConnection();
AssertionLoggerHandler.startCapture();
try {
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName.toString());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 1000; i++) {
TextMessage message = session.createTextMessage("Text " + i);
producer.send(message);
}
session.commit();
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
Wait.assertEquals(1000, serverQueue::getMessageCount);
conn.close();
conn = factory.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
LinkedListIterator<MessageReference> queueiterator = serverQueue.browserIterator();
ArrayList<Long> listQueue = new ArrayList<>(1000);
while (queueiterator.hasNext()) {
MessageReference ref = queueiterator.next();
listQueue.add(ref.getMessageID());
}
queueiterator.close();
MessageConsumer consumer = session.createConsumer(queue);
Wait.assertTrue(() -> serverQueue.getDeliveringCount() > 100);
for (Long l : listQueue) {
// this is forcing an artificial situation where the message was removed during a failure condition
server.getStorageManager().deleteMessage(l);
}
server.destroyQueue(queueName, null, false);
for (RemotingConnection connection : server.getRemotingService().getConnections()) {
connection.fail(new ActiveMQException("failure"));
}
Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find add info"));
} finally {
AssertionLoggerHandler.stopCapture();
try {
conn.close();
} catch (Throwable ignored) {
}
}
}
}

View File

@ -433,8 +433,8 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
public void deleteMessage(long messageID) throws Exception {
manager.deleteMessage(messageID);
public boolean deleteMessage(long messageID) throws Exception {
return manager.deleteMessage(messageID);
}
@Override
@ -448,13 +448,13 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
public void updateDeliveryCount(MessageReference ref) throws Exception {
manager.updateDeliveryCount(ref);
public boolean updateDeliveryCount(MessageReference ref) throws Exception {
return manager.updateDeliveryCount(ref);
}
@Override
public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
manager.updateScheduledDeliveryTime(ref);
public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception {
return manager.updateScheduledDeliveryTime(ref);
}
@Override

View File

@ -95,9 +95,9 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
return new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) {
@Override
public void deleteMessage(final long messageID) throws Exception {
public boolean deleteMessage(final long messageID) throws Exception {
deletedMessage.add(messageID);
super.deleteMessage(messageID);
return super.deleteMessage(messageID);
}
};
}

View File

@ -649,6 +649,15 @@ public final class ReplicationTest extends ActiveMQTestBase {
}
@Override
public boolean tryAppendUpdateRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync) throws Exception {
return true;
}
@Override
public void appendUpdateRecord(long id,
byte recordType,
@ -659,6 +668,16 @@ public final class ReplicationTest extends ActiveMQTestBase {
}
@Override
public boolean tryAppendUpdateRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion callback) throws Exception {
return true;
}
@Override
public void appendAddRecordTransactional(long txID,
long id,
@ -729,6 +748,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
}
@Override
public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception {
return true;
}
@Override
public void appendDeleteRecordTransactional(final long txID,
final long id,
@ -775,6 +799,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
}
@Override
public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
return true;
}
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
@ -875,6 +904,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
final IOCompletion completionCallback) throws Exception {
}
@Override
public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
return true;
}
@Override
public void appendPrepareRecord(final long txID,
final EncodingSupport transactionData,

View File

@ -39,8 +39,9 @@ public class FakeStorageManager extends NullStorageManager {
}
@Override
public void deleteMessage(final long messageID) throws Exception {
public boolean deleteMessage(final long messageID) throws Exception {
messageIds.remove(messageID);
return true;
}
@Override

View File

@ -117,6 +117,11 @@ under the License.
<multicast>
</multicast>
</address>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="exampleQueue">
<anycast>
<queue name="exampleQueue"/>

View File

@ -119,6 +119,11 @@ under the License.
<multicast>
</multicast>
</address>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="exampleQueue">
<anycast>
<queue name="exampleQueue"/>

View File

@ -41,6 +41,10 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class SoakPagingTest extends SmokeTestBase {
public static final int LAG_CONSUMER_TIME = 1000;
public static final int TIME_RUNNING = 4000;
public static final int CLIENT_KILLS = 2;
String protocol;
String consumerType;
boolean transaction;
@ -86,12 +90,13 @@ public class SoakPagingTest extends SmokeTestBase {
private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
if (protocol.toUpperCase().equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory(uri);
return new org.apache.activemq.ActiveMQConnectionFactory("failover:(" + uri + ")");
} else if (protocol.toUpperCase().equals("AMQP")) {
if (uri.startsWith("tcp://")) {
// replacing tcp:// by amqp://
uri = "amqp" + uri.substring(3);
}
return new JmsConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
@ -158,25 +163,14 @@ public class SoakPagingTest extends SmokeTestBase {
@Test
public void testPagingReplication() throws Throwable {
Process queueProcess = null;
if (consumerType.equals("queue")) {
queueProcess = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "45000", "" + transaction);
}
for (int i = 0; i < 3; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "15000", "" + transaction);
if (i == 0) {
server1 = startServer(SERVER_NAME_1, 0, 30000);
}
for (int i = 0; i < CLIENT_KILLS; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "" + TIME_RUNNING, "" + transaction);
int result = process.waitFor();
Assert.assertTrue(result > 0);
}
if (queueProcess != null) {
Assert.assertTrue(queueProcess.waitFor() > 0);
}
}
public void produce(ConnectionFactory factory) {
@ -261,7 +255,8 @@ public class SoakPagingTest extends SmokeTestBase {
messageConsumer = session.createConsumer(address);
}
Thread.sleep(5000);
if (LAG_CONSUMER_TIME > 0) Thread.sleep(LAG_CONSUMER_TIME);
connection.start();
int i = 0;

View File

@ -385,6 +385,20 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
journal.debugWait();
}
protected boolean tryUpdate(final long argument) throws Exception {
byte[] updateRecord = generateRecord(recordLength);
beforeJournalOperation();
boolean result = journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, sync);
if (result) {
records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, (short) 0));
}
return result;
}
protected void update(final long... arguments) throws Exception {
for (long element : arguments) {
byte[] updateRecord = generateRecord(recordLength);
@ -411,6 +425,20 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
journal.debugWait();
}
protected boolean tryDelete(final long argument) throws Exception {
beforeJournalOperation();
boolean result = journal.tryAppendDeleteRecord(argument, sync);
if (result) {
removeRecordsForID(argument);
}
journal.debugWait();
return result;
}
protected void addTx(final long txID, final long... arguments) throws Exception {
TransactionHolder tx = getTransaction(txID);

View File

@ -2688,6 +2688,22 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
loadAndCheck();
}
@Test
public void testTryIsolation2() throws Exception {
setup(10, 10 * 1024, true);
createJournal();
startJournal();
load();
addTx(1, 1, 2, 3);
Assert.assertFalse(tryUpdate(1));
stopJournal();
createJournal();
startJournal();
loadAndCheck();
}
@Test
public void testIsolation3() throws Exception {
setup(10, 10 * 1024, true);
@ -2708,6 +2724,22 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
loadAndCheck();
}
@Test
public void testTryDelete() throws Exception {
setup(10, 10 * 1024, true);
createJournal();
startJournal();
load();
addTx(1, 1, 2, 3);
Assert.assertFalse(tryDelete(1));
stopJournal();
createJournal();
startJournal();
loadAndCheck();
}
// XA tests
// ========