From cfd032799c6704e480a180bdc27a3d28def099d9 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 2 Jun 2021 12:53:36 -0400 Subject: [PATCH] ARTEMIS-3327 removing unecessary blocking operations on update and delete records --- .../jdbc/store/journal/JDBCJournalImpl.java | 17 +- .../artemis/core/journal/Journal.java | 20 +- .../core/journal/JournalUpdateCallback.java | 23 +++ .../core/journal/impl/FileWrapperJournal.java | 8 +- .../core/journal/impl/JournalBase.java | 19 +- .../core/journal/impl/JournalImpl.java | 185 +++++++++--------- .../core/persistence/StorageManager.java | 6 +- .../AbstractJournalStorageManager.java | 70 ++++--- .../impl/nullpm/NullStorageManager.java | 9 +- .../core/replication/ReplicatedJournal.java | 25 ++- .../core/replication/ReplicationEndpoint.java | 2 +- .../core/server/ActiveMQServerLogger.java | 4 +- .../artemis/core/server/impl/QueueImpl.java | 10 +- .../transaction/impl/TransactionImplTest.java | 9 +- .../integration/client/SendAckFailTest.java | 12 +- .../DeleteMessagesOnStartupTest.java | 4 +- .../replication/ReplicationTest.java | 20 +- .../server/FakeStorageManager.java | 3 +- .../journal/impl/JournalImplTestBase.java | 18 +- 19 files changed, 245 insertions(+), 219 deletions(-) create mode 100644 artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalUpdateCallback.java diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 9015c4e0d5..154d39a9cf 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; +import org.apache.activemq.artemis.core.journal.JournalUpdateCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; @@ -497,9 +498,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { + public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception { appendUpdateRecord(id, recordType, record, sync); - return true; } @Override @@ -518,9 +518,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { + public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception { appendUpdateRecord(id, recordType, persister, record, sync); - return true; } @Override @@ -548,14 +547,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @Override - public boolean tryAppendUpdateRecord(long id, + public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync, + JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception { appendUpdateRecord(id, recordType, persister, record, sync, completionCallback); - return true; } @@ -574,9 +573,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception { + public void tryAppendDeleteRecord(long id, JournalUpdateCallback updateCallback, boolean sync) throws Exception { appendDeleteRecord(id, sync); - return true; } @Override @@ -596,9 +594,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { + public void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception { appendDeleteRecord(id, sync, completionCallback); - return true; } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index d0734a8f5b..6618e8d90f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -110,19 +110,19 @@ public interface Journal extends ActiveMQComponent { void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; - boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; + void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception; default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync); } - default boolean tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { - return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync); + default void tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, JournalUpdateCallback updateCallback, boolean sync) throws Exception { + tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, updateCallback, sync); } void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; - boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; + void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception; default void appendUpdateRecord(long id, byte recordType, @@ -132,12 +132,13 @@ public interface Journal extends ActiveMQComponent { appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); } - default boolean tryAppendUpdateRecord(long id, + default void tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, + JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception { - return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); + tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, updateCallback, completionCallback); } void appendUpdateRecord(long id, @@ -147,20 +148,21 @@ public interface Journal extends ActiveMQComponent { boolean sync, IOCompletion callback) throws Exception; - boolean tryAppendUpdateRecord(long id, + void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync, + JournalUpdateCallback updateCallback, IOCompletion callback) throws Exception; void appendDeleteRecord(long id, boolean sync) throws Exception; - boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception; + void tryAppendDeleteRecord(long id, JournalUpdateCallback updateCallback, boolean sync) throws Exception; void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception; - boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception; + void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception; // Transactional operations diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalUpdateCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalUpdateCallback.java new file mode 100644 index 0000000000..2d595af474 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalUpdateCallback.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.journal; + + +public interface JournalUpdateCallback { + void onUpdate(long record, boolean result); +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 41da3ead2c..401797e4c4 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; +import org.apache.activemq.artemis.core.journal.JournalUpdateCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; @@ -190,9 +191,8 @@ public final class FileWrapperJournal extends JournalBase { @Override - public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception { + public void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion callback) throws Exception { appendDeleteRecord(id, sync, callback); - return true; } @Override @@ -223,15 +223,15 @@ public final class FileWrapperJournal extends JournalBase { } @Override - public boolean tryAppendUpdateRecord(long id, + public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync, + JournalUpdateCallback updateCallback, IOCompletion callback) throws Exception { JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); writeRecord(updateRecord, false, -1, false, callback); - return true; } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java index c1a61a55ba..d2df4de787 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java @@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.journal.JournalUpdateCallback; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; @@ -89,11 +90,12 @@ abstract class JournalBase implements Journal { } @Override - public boolean tryAppendUpdateRecord(final long id, + public void tryAppendUpdateRecord(final long id, final byte recordType, final byte[] record, + JournalUpdateCallback updateCallback, final boolean sync) throws Exception { - return tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); + tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync); } @Override @@ -156,20 +158,19 @@ abstract class JournalBase implements Journal { } @Override - public boolean tryAppendUpdateRecord(final long id, + public void tryAppendUpdateRecord(final long id, final byte recordType, final Persister persister, final Object record, + final JournalUpdateCallback updateCallback, final boolean sync) throws Exception { SyncIOCompletion callback = getSyncCallback(sync); - boolean append = tryAppendUpdateRecord(id, recordType, persister, record, sync, callback); + tryAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback); if (callback != null) { callback.waitCompletion(); } - - return append; } @Override @@ -196,16 +197,14 @@ abstract class JournalBase implements Journal { } @Override - public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception { + public void tryAppendDeleteRecord(final long id, JournalUpdateCallback updateCallback, final boolean sync) throws Exception { SyncIOCompletion callback = getSyncCallback(sync); - boolean result = tryAppendDeleteRecord(id, sync, callback); + tryAppendDeleteRecord(id, sync, updateCallback, callback); if (callback != null) { callback.waitCompletion(); } - - return result; } abstract void scheduleReclaim(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 4bb2647916..aac1554237 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; +import org.apache.activemq.artemis.core.journal.JournalUpdateCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -87,7 +88,6 @@ import org.apache.activemq.artemis.utils.SimpleFutureImpl; import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; -import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; import org.apache.activemq.artemis.utils.collections.LongHashSet; import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList; import org.jboss.logging.Logger; @@ -288,8 +288,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // Compacting may replace this structure private final ConcurrentLongHashMap records = new ConcurrentLongHashMap<>(); - private final ConcurrentLongHashSet pendingRecords = new ConcurrentLongHashSet(); - // Compacting may replace this structure private final ConcurrentLongHashMap transactions = new ConcurrentLongHashMap<>(); @@ -908,7 +906,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final IOCompletion callback) throws Exception { checkJournalIsLoaded(); lineUpContext(callback); - pendingRecords.add(id); if (logger.isTraceEnabled()) { logger.trace("scheduling appendAddRecord::id=" + id + @@ -952,7 +949,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal setErrorCondition(callback, null, e); logger.error("appendAddRecord::" + e, e); } finally { - pendingRecords.remove(id); journalLock.readLock().unlock(); } } @@ -1011,7 +1007,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal setErrorCondition(callback, null, e); logger.error("appendAddEvent::" + e, e); } finally { - pendingRecords.remove(id); journalLock.readLock().unlock(); } }); @@ -1028,7 +1023,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final IOCompletion callback) throws Exception { checkJournalIsLoaded(); lineUpContext(callback); - checkKnownRecordID(id, true); if (logger.isTraceEnabled()) { logger.trace("scheduling appendUpdateRecord::id=" + id + @@ -1036,27 +1030,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal recordType); } - internalAppendUpdateRecord(id, recordType, persister, record, sync, callback); + SimpleFuture future = new SimpleFutureImpl<>(); + + internalAppendUpdateRecord(id, recordType, persister, record, sync, (t, v) -> future.set(v), callback); + + if (!future.get()) { + throw new IllegalStateException("Cannot find add info " + id); + } } @Override - public boolean tryAppendUpdateRecord(final long id, - final byte recordType, - final Persister persister, - final Object record, - final boolean sync, - final IOCompletion callback) throws Exception { + public void tryAppendUpdateRecord(final long id, + final byte recordType, + final Persister persister, + final Object record, + final boolean sync, + JournalUpdateCallback updateCallback, + final IOCompletion callback) throws Exception { checkJournalIsLoaded(); lineUpContext(callback); - if (!checkKnownRecordID(id, false)) { - if (callback != null) { - callback.done(); - } - return false; - } - if (logger.isTraceEnabled()) { logger.trace("scheduling appendUpdateRecord::id=" + id + ", userRecordType=" + @@ -1064,9 +1058,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } - internalAppendUpdateRecord(id, recordType, persister, record, sync, callback); - - return true; + internalAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback); } @@ -1075,14 +1067,32 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal Persister persister, Object record, boolean sync, + JournalUpdateCallback updateCallback, IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { - final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); try { + // compactor will never change while readLock is acquired. + // but we are doing this since compactor is volatile, to avoid some extra work from JIT + JournalCompactor compactor = JournalImpl.this.compactor; JournalRecord jrnRecord = records.get(id); + if (jrnRecord == null) { + if (compactor == null || (!compactor.containsRecord(id))) { + if (updateCallback != null) { + updateCallback.onUpdate(id, false); + } + if (logger.isDebugEnabled()) { + logger.debug("Record " + id + " had not been found"); + } + + if (callback != null) { + callback.done(); + } + return; + } + } JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback); @@ -1097,17 +1107,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // record==null here could only mean there is a compactor // computing the delete should be done after compacting is done if (jrnRecord == null) { - compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize()); + if (compactor != null) { + compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize()); + } } else { jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); } - result.set(true); + if (updateCallback != null) { + updateCallback.onUpdate(id, true); + } } catch (ActiveMQShutdownException e) { - result.fail(e); + if (updateCallback != null) { + updateCallback.onUpdate(id, false); + } logger.error("appendUpdateRecord:" + e, e); } catch (Throwable e) { - result.fail(e); + if (updateCallback != null) { + updateCallback.onUpdate(id, false); + } setErrorCondition(callback, null, e); logger.error("appendUpdateRecord:" + e, e); } finally { @@ -1115,8 +1133,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } }); - - result.get(); } @Override @@ -1129,15 +1145,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal checkJournalIsLoaded(); lineUpContext(callback); - checkKnownRecordID(id, true); - - internalAppendDeleteRecord(id, sync, callback); + SimpleFuture future = new SimpleFutureImpl<>(); + internalAppendDeleteRecord(id, sync, (t, v) -> future.set(v), callback); + if (!future.get()) { + throw new IllegalStateException("Cannot find add info " + id); + } return; } @Override - public boolean tryAppendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception { + public void tryAppendDeleteRecord(final long id, final boolean sync, final JournalUpdateCallback updateCallback, final IOCompletion callback) throws Exception { if (logger.isTraceEnabled()) { logger.trace("scheduling appendDeleteRecord::id=" + id); @@ -1146,29 +1164,45 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal checkJournalIsLoaded(); lineUpContext(callback); - if (!checkKnownRecordID(id, false)) { - if (callback != null) { - callback.done(); - } - return false; - } - - internalAppendDeleteRecord(id, sync, callback); - return true; + internalAppendDeleteRecord(id, sync, updateCallback, callback); } private void internalAppendDeleteRecord(long id, boolean sync, - IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { - final SimpleFuture result = newSyncAndCallbackResult(sync, callback); + JournalUpdateCallback updateCallback, + IOCompletion callback) { + appendExecutor.execute(new Runnable() { @Override public void run() { journalLock.readLock().lock(); try { + // compactor will never change while readLock is acquired. + // but we are doing this since compactor is volatile, to avoid some extra work from JIT + JournalCompactor compactor = JournalImpl.this.compactor; JournalRecord record = null; if (compactor == null) { record = records.remove(id); + if (record == null) { + if (updateCallback != null) { + updateCallback.onUpdate(id, false); + } + + if (callback != null) { + callback.done(); + } + return; + } + } else { + if (!records.containsKey(id) && !compactor.containsRecord(id)) { + if (updateCallback != null) { + updateCallback.onUpdate(id, false); + } + if (callback != null) { + callback.done(); + } + return; + } } JournalInternalRecord deleteRecord = new JournalDeleteRecord(id); @@ -1182,20 +1216,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // computing the delete should be done after compacting is done if (record == null) { // JournalImplTestUni::testDoubleDelete was written to validate this condition: - if (compactor == null) { - logger.debug("Record " + id + " had been deleted already from a different call"); - } else { - compactor.addCommandDelete(id, usedFile); - } + compactor.addCommandDelete(id, usedFile); } else { record.delete(usedFile); } - result.set(true); + if (updateCallback != null) { + updateCallback.onUpdate(id, true); + } } catch (ActiveMQShutdownException e) { - result.fail(e); + if (updateCallback != null) { + updateCallback.onUpdate(id, false); + } logger.error("appendDeleteRecord:" + e, e); } catch (Throwable e) { - result.fail(e); + if (updateCallback != null) { + updateCallback.onUpdate(id, false); + } logger.error("appendDeleteRecord:" + e, e); setErrorCondition(callback, null, e); } finally { @@ -1203,8 +1239,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } }); - - result.get(); } private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) { @@ -1266,45 +1300,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); } - - private boolean checkKnownRecordID(final long id, boolean strict) throws Exception { - if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.containsRecord(id))) { - return true; - } - - final SimpleFuture known = new SimpleFutureImpl<>(); - - // retry on the append thread. maybe the appender thread is not keeping up. - appendExecutor.execute(new Runnable() { - @Override - public void run() { - try { - journalLock.readLock().lock(); - try { - - known.set(records.containsKey(id) - || pendingRecords.contains(id) - || (compactor != null && compactor.containsRecord(id))); - } finally { - journalLock.readLock().unlock(); - } - } catch (Throwable t) { - known.fail(t); - throw t; - } - } - }); - - if (!known.get()) { - if (strict) { - throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records"); - } - return false; - } else { - return true; - } - } - private void checkJournalIsLoaded() throws Exception { if (state != JournalState.LOADED && state != JournalState.SYNCING) { throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index ca53c7c24c..748f682685 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -198,15 +198,15 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void storeReference(long queueID, long messageID, boolean last) throws Exception; - boolean deleteMessage(long messageID) throws Exception; + void deleteMessage(long messageID) throws Exception; void storeAcknowledge(long queueID, long messageID) throws Exception; void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception; - boolean updateDeliveryCount(MessageReference ref) throws Exception; + void updateDeliveryCount(MessageReference ref) throws Exception; - boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception; + void updateScheduledDeliveryTime(MessageReference ref) throws Exception; void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 632d8b9786..a447285aab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -360,7 +360,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void confirmPendingLargeMessage(long recordID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.appendDeleteRecord(recordID, true, getContext()); + messageJournal.tryAppendDeleteRecord(recordID, true, this::messageUpdateCallback, getContext()); } } @@ -385,7 +385,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext(last && syncNonTransactional)); + messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, this::messageUpdateCallback, getContext(last && syncNonTransactional)); } } @@ -428,7 +428,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void storeAcknowledge(final long queueID, final long messageID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional)); + messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional)); } } @@ -442,21 +442,35 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } @Override - public boolean deleteMessage(final long messageID) throws Exception { + public void deleteMessage(final long messageID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { // Messages are deleted on postACK, one after another. // If these deletes are synchronized, we would build up messages on the Executor // increasing chances of losing deletes. // The StorageManager should verify messages without references - return messageJournal.tryAppendDeleteRecord(messageID, false, getContext(false)); + messageJournal.tryAppendDeleteRecord(messageID, false, this::messageUpdateCallback, getContext(false)); + } + } + + private void messageUpdateCallback(long id, boolean found) { + if (!found) { + ActiveMQServerLogger.LOGGER.cannotFindMessageOnJournal(new Exception(), id); + } + } + + private void recordNotFoundCallback(long id, boolean found) { + if (!found) { + if (logger.isDebugEnabled()) { + logger.debug("Record " + id + " not found"); + } } } @Override - public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception { + public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception { ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); try (ArtemisCloseable lock = closeableReadLock()) { - return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional)); + messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, this::recordNotFoundCallback, getContext(syncNonTransactional)); } } @@ -472,7 +486,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void deleteDuplicateID(final long recordID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional)); + messageJournal.tryAppendDeleteRecord(recordID, syncNonTransactional, this::recordNotFoundCallback, getContext(syncNonTransactional)); } } @@ -546,7 +560,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void deletePageComplete(long ackID) throws Exception { - messageJournal.appendDeleteRecord(ackID, false); + messageJournal.tryAppendDeleteRecord(ackID, this::recordNotFoundCallback, false); } @Override @@ -558,7 +572,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void deleteCursorAcknowledge(long ackID) throws Exception { - messageJournal.appendDeleteRecord(ackID, false); + messageJournal.tryAppendDeleteRecord(ackID, this::recordNotFoundCallback, false); } @Override @@ -574,14 +588,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void deleteHeuristicCompletion(final long id) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.appendDeleteRecord(id, true, getContext(true)); + messageJournal.tryAppendDeleteRecord(id, true, this::recordNotFoundCallback, getContext(true)); } } @Override public void deletePageTransactional(final long recordID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.appendDeleteRecord(recordID, false); + messageJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, false); } } @@ -677,18 +691,18 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp // Other operations @Override - public boolean updateDeliveryCount(final MessageReference ref) throws Exception { + public void updateDeliveryCount(final MessageReference ref) throws Exception { // no need to store if it's the same value // otherwise the journal will get OME in case of lots of redeliveries if (ref.getDeliveryCount() == ref.getPersistedCount()) { - return true; + return; } ref.setPersistedCount(ref.getDeliveryCount()); DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount()); try (ArtemisCloseable lock = closeableReadLock()) { - return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional)); + messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional)); } } @@ -741,7 +755,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp PersistedDivertConfiguration oldDivert = mapPersistedDivertConfigurations.remove(divertName); if (oldDivert != null) { try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.appendDeleteRecord(oldDivert.getStoreId(), false); + bindingsJournal.tryAppendDeleteRecord(oldDivert.getStoreId(), this::recordNotFoundCallback, false); } } } @@ -767,7 +781,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp PersistedUser oldUser = mapPersistedUsers.remove(username); if (oldUser != null) { try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.appendDeleteRecord(oldUser.getStoreId(), false); + bindingsJournal.tryAppendDeleteRecord(oldUser.getStoreId(), this::recordNotFoundCallback, false); } } } @@ -793,7 +807,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp PersistedRole oldRole = mapPersistedRoles.remove(username); if (oldRole != null) { try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.appendDeleteRecord(oldRole.getStoreId(), false); + bindingsJournal.tryAppendDeleteRecord(oldRole.getStoreId(), this::recordNotFoundCallback, false); } } } @@ -813,7 +827,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void deleteID(long journalD) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.appendDeleteRecord(journalD, false); + bindingsJournal.tryAppendDeleteRecord(journalD, this::recordNotFoundCallback, false); } } @@ -822,7 +836,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch); if (oldSetting != null) { try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false); + bindingsJournal.tryAppendDeleteRecord(oldSetting.getStoreId(), this::recordNotFoundCallback, false); } } } @@ -832,7 +846,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp PersistedSecuritySetting oldRoles = mapPersistedSecuritySettings.remove(addressMatch); if (oldRoles != null) { try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false); + bindingsJournal.tryAppendDeleteRecord(oldRoles.getStoreId(), this::recordNotFoundCallback, false); } } } @@ -1094,7 +1108,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp sub.reloadACK(encoding.position); } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloading(encoding.queueID); - messageJournal.appendDeleteRecord(record.id, false); + messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false); } @@ -1111,7 +1125,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize()); } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID()); - messageJournal.appendDeleteRecord(record.id, false); + messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false); } break; @@ -1128,7 +1142,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize()); } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID()); - messageJournal.appendDeleteRecord(record.id, false); + messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false); } break; @@ -1147,11 +1161,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp if (logger.isDebugEnabled()) { logger.debug("Complete page " + encoding.position.getPageNr() + " doesn't exist on page manager " + sub.getPagingStore().getAddress()); } - messageJournal.appendDeleteRecord(record.id, false); + messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false); } } else { ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID); - messageJournal.appendDeleteRecord(record.id, false); + messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false); } break; @@ -1332,7 +1346,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void deleteQueueStatus(long recordID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.appendDeleteRecord(recordID, true); + bindingsJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, true); } } @@ -1350,7 +1364,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void deleteAddressStatus(long recordID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.appendDeleteRecord(recordID, true); + bindingsJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, true); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 3c3824caaf..61a95c8001 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -237,8 +237,7 @@ public class NullStorageManager implements StorageManager { } @Override - public boolean deleteMessage(final long messageID) throws Exception { - return true; + public void deleteMessage(final long messageID) throws Exception { } @Override @@ -250,8 +249,7 @@ public class NullStorageManager implements StorageManager { } @Override - public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception { - return true; + public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception { } @Override @@ -263,8 +261,7 @@ public class NullStorageManager implements StorageManager { } @Override - public boolean updateDeliveryCount(final MessageReference ref) throws Exception { - return true; + public void updateDeliveryCount(final MessageReference ref) throws Exception { } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index 4661149183..58f7069a82 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; +import org.apache.activemq.artemis.core.journal.JournalUpdateCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; @@ -238,12 +239,12 @@ public class ReplicatedJournal implements Journal { * @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecord(long, boolean) */ @Override - public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception { + public void tryAppendDeleteRecord(final long id, final JournalUpdateCallback updateCallback, final boolean sync) throws Exception { if (log.isTraceEnabled()) { log.trace("AppendDelete " + id); } replicationManager.appendDeleteRecord(journalID, id); - return localJournal.tryAppendDeleteRecord(id, sync); + localJournal.tryAppendDeleteRecord(id, updateCallback, sync); } @Override @@ -258,14 +259,15 @@ public class ReplicatedJournal implements Journal { } @Override - public boolean tryAppendDeleteRecord(final long id, + public void tryAppendDeleteRecord(final long id, final boolean sync, + final JournalUpdateCallback updateCallback, final IOCompletion completionCallback) throws Exception { if (log.isTraceEnabled()) { log.trace("AppendDelete " + id); } replicationManager.appendDeleteRecord(journalID, id); - return localJournal.tryAppendDeleteRecord(id, sync, completionCallback); + localJournal.tryAppendDeleteRecord(id, sync, updateCallback, completionCallback); } /** * @param txID @@ -395,12 +397,13 @@ public class ReplicatedJournal implements Journal { } @Override - public boolean tryAppendUpdateRecord(final long id, + public void tryAppendUpdateRecord(final long id, final byte recordType, final byte[] record, + final JournalUpdateCallback updateCallback, final boolean sync) throws Exception { - return this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); + this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync); } /** @@ -425,16 +428,17 @@ public class ReplicatedJournal implements Journal { } @Override - public boolean tryAppendUpdateRecord(final long id, + public void tryAppendUpdateRecord(final long id, final byte recordType, final Persister persister, final Object record, + final JournalUpdateCallback updateCallback, final boolean sync) throws Exception { if (log.isTraceEnabled()) { log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType); } replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record); - return localJournal.tryAppendUpdateRecord(id, recordType, persister, record, sync); + localJournal.tryAppendUpdateRecord(id, recordType, persister, record, updateCallback, sync); } @Override @@ -452,17 +456,18 @@ public class ReplicatedJournal implements Journal { } @Override - public boolean tryAppendUpdateRecord(final long id, + public void tryAppendUpdateRecord(final long id, final byte journalRecordType, final Persister persister, final Object record, final boolean sync, + final JournalUpdateCallback updateCallback, final IOCompletion completionCallback) throws Exception { if (log.isTraceEnabled()) { log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType); } replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record); - return localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback); + localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, updateCallback, completionCallback); } /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 12202609c2..b173f3afdc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -730,7 +730,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon */ private void handleAppendDelete(final ReplicationDeleteMessage packet) throws Exception { Journal journalToUse = getJournal(packet.getJournalID()); - journalToUse.appendDeleteRecord(packet.getId(), noSync); + journalToUse.tryAppendDeleteRecord(packet.getId(), null, noSync); } /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index c754c19984..59a4917cc6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1077,9 +1077,9 @@ public interface ActiveMQServerLogger extends BasicLogger { void errorDecrementingRefCount(@Cause Throwable e); @LogMessage(level = Logger.Level.WARN) - @Message(id = 222153, value = "Unable to remove message id = {0} please remove manually", + @Message(id = 222153, value = "Cannot locate record for message id = {0} on Journal", format = Message.Format.MESSAGE_FORMAT) - void errorRemovingMessage(@Cause Throwable e, Long messageID); + void cannotFindMessageOnJournal(@Cause Throwable e, Long messageID); @LogMessage(level = Logger.Level.WARN) @Message(id = 222154, value = "Error checking DLQ", diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a8ce01dfb0..d97891d360 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3278,9 +3278,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) { - if (!storageManager.updateDeliveryCount(reference)) { - return new Pair<>(false, false); - } + storageManager.updateDeliveryCount(reference); } AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); @@ -3920,11 +3918,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // as we can't delete each messaging with sync=true while adding messages transactionally. // There is a startup check to remove non referenced messages case these deletes fail try { - if (!storageManager.deleteMessage(message.getMessageID())) { - ActiveMQServerLogger.LOGGER.errorRemovingMessage(new Exception(), message.getMessageID()); - } + storageManager.deleteMessage(message.getMessageID()); } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID()); + ActiveMQServerLogger.LOGGER.cannotFindMessageOnJournal(e, message.getMessageID()); } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index bd1979b28b..77ba7ee81c 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -370,8 +370,7 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public boolean deleteMessage(long messageID) throws Exception { - return true; + public void deleteMessage(long messageID) throws Exception { } @Override @@ -385,13 +384,11 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public boolean updateDeliveryCount(MessageReference ref) throws Exception { - return true; + public void updateDeliveryCount(MessageReference ref) throws Exception { } @Override - public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception { - return true; + public void updateScheduledDeliveryTime(MessageReference ref) throws Exception { } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index 64a4c651a8..01b1c22a13 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -441,8 +441,8 @@ public class SendAckFailTest extends SpawnedTestBase { } @Override - public boolean deleteMessage(long messageID) throws Exception { - return manager.deleteMessage(messageID); + public void deleteMessage(long messageID) throws Exception { + manager.deleteMessage(messageID); } @Override @@ -456,13 +456,13 @@ public class SendAckFailTest extends SpawnedTestBase { } @Override - public boolean updateDeliveryCount(MessageReference ref) throws Exception { - return manager.updateDeliveryCount(ref); + public void updateDeliveryCount(MessageReference ref) throws Exception { + manager.updateDeliveryCount(ref); } @Override - public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception { - return manager.updateScheduledDeliveryTime(ref); + public void updateScheduledDeliveryTime(MessageReference ref) throws Exception { + manager.updateScheduledDeliveryTime(ref); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java index 3fc2a05561..e69d4d9b33 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java @@ -95,9 +95,9 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase { protected JournalStorageManager createJournalStorageManager(Configuration configuration) { return new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) { @Override - public boolean deleteMessage(final long messageID) throws Exception { + public void deleteMessage(final long messageID) throws Exception { deletedMessage.add(messageID); - return super.deleteMessage(messageID); + super.deleteMessage(messageID); } }; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 24342ffb6e..3552f2a4e3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; +import org.apache.activemq.artemis.core.journal.JournalUpdateCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -696,12 +697,11 @@ public final class ReplicationTest extends ActiveMQTestBase { } @Override - public boolean tryAppendUpdateRecord(long id, + public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, - Object record, + Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception { - return true; } @Override @@ -715,13 +715,12 @@ public final class ReplicationTest extends ActiveMQTestBase { } @Override - public boolean tryAppendUpdateRecord(long id, + public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, - boolean sync, + boolean sync, JournalUpdateCallback updateCallback, IOCompletion callback) throws Exception { - return true; } @Override @@ -795,8 +794,7 @@ public final class ReplicationTest extends ActiveMQTestBase { } @Override - public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception { - return true; + public void tryAppendDeleteRecord(long id, JournalUpdateCallback updateConsumer, boolean sync) throws Exception { } @Override @@ -846,8 +844,7 @@ public final class ReplicationTest extends ActiveMQTestBase { } @Override - public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { - return true; + public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception { } @Override @@ -951,8 +948,7 @@ public final class ReplicationTest extends ActiveMQTestBase { } @Override - public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { - return true; + public void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception { } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java index c38bc453c5..67cfe18f41 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java @@ -39,9 +39,8 @@ public class FakeStorageManager extends NullStorageManager { } @Override - public boolean deleteMessage(final long messageID) throws Exception { + public void deleteMessage(final long messageID) throws Exception { messageIds.remove(messageID); - return true; } @Override diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java index 34b2d9af7e..0ec7f5c672 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.cli.commands.tools.journal.DecodeJournal; import org.apache.activemq.artemis.cli.commands.tools.journal.EncodeJournal; @@ -41,6 +42,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.SimpleFutureImpl; import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList; import org.jboss.logging.Logger; import org.junit.After; @@ -418,13 +420,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { beforeJournalOperation(); - boolean result = journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, sync); + SimpleFutureImpl future = new SimpleFutureImpl(); - if (result) { + journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, (r, b) -> future.set(b), sync); + + if (future.get()) { + Assert.fail(); records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, (short) 0)); } - return result; + return future.get(); } protected void update(final long... arguments) throws Exception { @@ -456,15 +461,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { protected boolean tryDelete(final long argument) throws Exception { beforeJournalOperation(); - boolean result = journal.tryAppendDeleteRecord(argument, sync); + AtomicBoolean result = new AtomicBoolean(true); + journal.tryAppendDeleteRecord(argument, (t, b) -> result.set(b), sync); - if (result) { + if (result.get()) { removeRecordsForID(argument); } journal.debugWait(); - return result; + return result.get(); } protected void addTx(final long txID, final long... arguments) throws Exception {