ARTEMIS-3327 removing unecessary blocking operations on update and delete records

This commit is contained in:
Clebert Suconic 2021-06-02 12:53:36 -04:00 committed by clebertsuconic
parent f4f31df97b
commit cfd032799c
19 changed files with 245 additions and 219 deletions

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@ -497,9 +498,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
@Override @Override
public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
appendUpdateRecord(id, recordType, record, sync); appendUpdateRecord(id, recordType, record, sync);
return true;
} }
@Override @Override
@ -518,9 +518,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
@Override @Override
public boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
appendUpdateRecord(id, recordType, persister, record, sync); appendUpdateRecord(id, recordType, persister, record, sync);
return true;
} }
@Override @Override
@ -548,14 +547,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override @Override
public boolean tryAppendUpdateRecord(long id, public void tryAppendUpdateRecord(long id,
byte recordType, byte recordType,
Persister persister, Persister persister,
Object record, Object record,
boolean sync, boolean sync,
JournalUpdateCallback updateCallback,
IOCompletion completionCallback) throws Exception { IOCompletion completionCallback) throws Exception {
appendUpdateRecord(id, recordType, persister, record, sync, completionCallback); appendUpdateRecord(id, recordType, persister, record, sync, completionCallback);
return true;
} }
@ -574,9 +573,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
@Override @Override
public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception { public void tryAppendDeleteRecord(long id, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
appendDeleteRecord(id, sync); appendDeleteRecord(id, sync);
return true;
} }
@Override @Override
@ -596,9 +594,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
@Override @Override
public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { public void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception {
appendDeleteRecord(id, sync, completionCallback); appendDeleteRecord(id, sync, completionCallback);
return true;
} }
@Override @Override

View File

@ -110,19 +110,19 @@ public interface Journal extends ActiveMQComponent {
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception;
default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync); appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
} }
default boolean tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { default void tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync); tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, updateCallback, sync);
} }
void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception;
default void appendUpdateRecord(long id, default void appendUpdateRecord(long id,
byte recordType, byte recordType,
@ -132,12 +132,13 @@ public interface Journal extends ActiveMQComponent {
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
} }
default boolean tryAppendUpdateRecord(long id, default void tryAppendUpdateRecord(long id,
byte recordType, byte recordType,
EncodingSupport record, EncodingSupport record,
boolean sync, boolean sync,
JournalUpdateCallback updateCallback,
IOCompletion completionCallback) throws Exception { IOCompletion completionCallback) throws Exception {
return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, updateCallback, completionCallback);
} }
void appendUpdateRecord(long id, void appendUpdateRecord(long id,
@ -147,20 +148,21 @@ public interface Journal extends ActiveMQComponent {
boolean sync, boolean sync,
IOCompletion callback) throws Exception; IOCompletion callback) throws Exception;
boolean tryAppendUpdateRecord(long id, void tryAppendUpdateRecord(long id,
byte recordType, byte recordType,
Persister persister, Persister persister,
Object record, Object record,
boolean sync, boolean sync,
JournalUpdateCallback updateCallback,
IOCompletion callback) throws Exception; IOCompletion callback) throws Exception;
void appendDeleteRecord(long id, boolean sync) throws Exception; void appendDeleteRecord(long id, boolean sync) throws Exception;
boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception; void tryAppendDeleteRecord(long id, JournalUpdateCallback updateCallback, boolean sync) throws Exception;
void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception; void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception; void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception;
// Transactional operations // Transactional operations

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.journal;
public interface JournalUpdateCallback {
void onUpdate(long record, boolean result);
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@ -190,9 +191,8 @@ public final class FileWrapperJournal extends JournalBase {
@Override @Override
public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception { public void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion callback) throws Exception {
appendDeleteRecord(id, sync, callback); appendDeleteRecord(id, sync, callback);
return true;
} }
@Override @Override
@ -223,15 +223,15 @@ public final class FileWrapperJournal extends JournalBase {
} }
@Override @Override
public boolean tryAppendUpdateRecord(long id, public void tryAppendUpdateRecord(long id,
byte recordType, byte recordType,
Persister persister, Persister persister,
Object record, Object record,
boolean sync, boolean sync,
JournalUpdateCallback updateCallback,
IOCompletion callback) throws Exception { IOCompletion callback) throws Exception {
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
writeRecord(updateRecord, false, -1, false, callback); writeRecord(updateRecord, false, -1, false, callback);
return true;
} }
@Override @Override

View File

@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
@ -89,11 +90,12 @@ abstract class JournalBase implements Journal {
} }
@Override @Override
public boolean tryAppendUpdateRecord(final long id, public void tryAppendUpdateRecord(final long id,
final byte recordType, final byte recordType,
final byte[] record, final byte[] record,
JournalUpdateCallback updateCallback,
final boolean sync) throws Exception { final boolean sync) throws Exception {
return tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync);
} }
@Override @Override
@ -156,20 +158,19 @@ abstract class JournalBase implements Journal {
} }
@Override @Override
public boolean tryAppendUpdateRecord(final long id, public void tryAppendUpdateRecord(final long id,
final byte recordType, final byte recordType,
final Persister persister, final Persister persister,
final Object record, final Object record,
final JournalUpdateCallback updateCallback,
final boolean sync) throws Exception { final boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync); SyncIOCompletion callback = getSyncCallback(sync);
boolean append = tryAppendUpdateRecord(id, recordType, persister, record, sync, callback); tryAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback);
if (callback != null) { if (callback != null) {
callback.waitCompletion(); callback.waitCompletion();
} }
return append;
} }
@Override @Override
@ -196,16 +197,14 @@ abstract class JournalBase implements Journal {
} }
@Override @Override
public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception { public void tryAppendDeleteRecord(final long id, JournalUpdateCallback updateCallback, final boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync); SyncIOCompletion callback = getSyncCallback(sync);
boolean result = tryAppendDeleteRecord(id, sync, callback); tryAppendDeleteRecord(id, sync, updateCallback, callback);
if (callback != null) { if (callback != null) {
callback.waitCompletion(); callback.waitCompletion();
} }
return result;
} }
abstract void scheduleReclaim(); abstract void scheduleReclaim();

View File

@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -87,7 +88,6 @@ import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
import org.apache.activemq.artemis.utils.collections.LongHashSet; import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList; import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -288,8 +288,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Compacting may replace this structure // Compacting may replace this structure
private final ConcurrentLongHashMap<JournalRecord> records = new ConcurrentLongHashMap<>(); private final ConcurrentLongHashMap<JournalRecord> records = new ConcurrentLongHashMap<>();
private final ConcurrentLongHashSet pendingRecords = new ConcurrentLongHashSet();
// Compacting may replace this structure // Compacting may replace this structure
private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>(); private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
@ -908,7 +906,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final IOCompletion callback) throws Exception { final IOCompletion callback) throws Exception {
checkJournalIsLoaded(); checkJournalIsLoaded();
lineUpContext(callback); lineUpContext(callback);
pendingRecords.add(id);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("scheduling appendAddRecord::id=" + id + logger.trace("scheduling appendAddRecord::id=" + id +
@ -952,7 +949,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
setErrorCondition(callback, null, e); setErrorCondition(callback, null, e);
logger.error("appendAddRecord::" + e, e); logger.error("appendAddRecord::" + e, e);
} finally { } finally {
pendingRecords.remove(id);
journalLock.readLock().unlock(); journalLock.readLock().unlock();
} }
} }
@ -1011,7 +1007,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
setErrorCondition(callback, null, e); setErrorCondition(callback, null, e);
logger.error("appendAddEvent::" + e, e); logger.error("appendAddEvent::" + e, e);
} finally { } finally {
pendingRecords.remove(id);
journalLock.readLock().unlock(); journalLock.readLock().unlock();
} }
}); });
@ -1028,7 +1023,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final IOCompletion callback) throws Exception { final IOCompletion callback) throws Exception {
checkJournalIsLoaded(); checkJournalIsLoaded();
lineUpContext(callback); lineUpContext(callback);
checkKnownRecordID(id, true);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("scheduling appendUpdateRecord::id=" + id + logger.trace("scheduling appendUpdateRecord::id=" + id +
@ -1036,27 +1030,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
recordType); recordType);
} }
internalAppendUpdateRecord(id, recordType, persister, record, sync, callback); SimpleFuture<Boolean> future = new SimpleFutureImpl<>();
internalAppendUpdateRecord(id, recordType, persister, record, sync, (t, v) -> future.set(v), callback);
if (!future.get()) {
throw new IllegalStateException("Cannot find add info " + id);
}
} }
@Override @Override
public boolean tryAppendUpdateRecord(final long id, public void tryAppendUpdateRecord(final long id,
final byte recordType, final byte recordType,
final Persister persister, final Persister persister,
final Object record, final Object record,
final boolean sync, final boolean sync,
final IOCompletion callback) throws Exception { JournalUpdateCallback updateCallback,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded(); checkJournalIsLoaded();
lineUpContext(callback); lineUpContext(callback);
if (!checkKnownRecordID(id, false)) {
if (callback != null) {
callback.done();
}
return false;
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("scheduling appendUpdateRecord::id=" + id + logger.trace("scheduling appendUpdateRecord::id=" + id +
", userRecordType=" + ", userRecordType=" +
@ -1064,9 +1058,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
internalAppendUpdateRecord(id, recordType, persister, record, sync, callback); internalAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback);
return true;
} }
@ -1075,14 +1067,32 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
Persister persister, Persister persister,
Object record, Object record,
boolean sync, boolean sync,
JournalUpdateCallback updateCallback,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
journalLock.readLock().lock(); journalLock.readLock().lock();
try { try {
// compactor will never change while readLock is acquired.
// but we are doing this since compactor is volatile, to avoid some extra work from JIT
JournalCompactor compactor = JournalImpl.this.compactor;
JournalRecord jrnRecord = records.get(id); JournalRecord jrnRecord = records.get(id);
if (jrnRecord == null) {
if (compactor == null || (!compactor.containsRecord(id))) {
if (updateCallback != null) {
updateCallback.onUpdate(id, false);
}
if (logger.isDebugEnabled()) {
logger.debug("Record " + id + " had not been found");
}
if (callback != null) {
callback.done();
}
return;
}
}
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback); JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
@ -1097,17 +1107,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// record==null here could only mean there is a compactor // record==null here could only mean there is a compactor
// computing the delete should be done after compacting is done // computing the delete should be done after compacting is done
if (jrnRecord == null) { if (jrnRecord == null) {
compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize()); if (compactor != null) {
compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
}
} else { } else {
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
} }
result.set(true); if (updateCallback != null) {
updateCallback.onUpdate(id, true);
}
} catch (ActiveMQShutdownException e) { } catch (ActiveMQShutdownException e) {
result.fail(e); if (updateCallback != null) {
updateCallback.onUpdate(id, false);
}
logger.error("appendUpdateRecord:" + e, e); logger.error("appendUpdateRecord:" + e, e);
} catch (Throwable e) { } catch (Throwable e) {
result.fail(e); if (updateCallback != null) {
updateCallback.onUpdate(id, false);
}
setErrorCondition(callback, null, e); setErrorCondition(callback, null, e);
logger.error("appendUpdateRecord:" + e, e); logger.error("appendUpdateRecord:" + e, e);
} finally { } finally {
@ -1115,8 +1133,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
} }
}); });
result.get();
} }
@Override @Override
@ -1129,15 +1145,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
checkJournalIsLoaded(); checkJournalIsLoaded();
lineUpContext(callback); lineUpContext(callback);
checkKnownRecordID(id, true); SimpleFuture<Boolean> future = new SimpleFutureImpl<>();
internalAppendDeleteRecord(id, sync, (t, v) -> future.set(v), callback);
internalAppendDeleteRecord(id, sync, callback); if (!future.get()) {
throw new IllegalStateException("Cannot find add info " + id);
}
return; return;
} }
@Override @Override
public boolean tryAppendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception { public void tryAppendDeleteRecord(final long id, final boolean sync, final JournalUpdateCallback updateCallback, final IOCompletion callback) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("scheduling appendDeleteRecord::id=" + id); logger.trace("scheduling appendDeleteRecord::id=" + id);
@ -1146,29 +1164,45 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
checkJournalIsLoaded(); checkJournalIsLoaded();
lineUpContext(callback); lineUpContext(callback);
if (!checkKnownRecordID(id, false)) { internalAppendDeleteRecord(id, sync, updateCallback, callback);
if (callback != null) {
callback.done();
}
return false;
}
internalAppendDeleteRecord(id, sync, callback);
return true;
} }
private void internalAppendDeleteRecord(long id, private void internalAppendDeleteRecord(long id,
boolean sync, boolean sync,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { JournalUpdateCallback updateCallback,
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); IOCompletion callback) {
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
journalLock.readLock().lock(); journalLock.readLock().lock();
try { try {
// compactor will never change while readLock is acquired.
// but we are doing this since compactor is volatile, to avoid some extra work from JIT
JournalCompactor compactor = JournalImpl.this.compactor;
JournalRecord record = null; JournalRecord record = null;
if (compactor == null) { if (compactor == null) {
record = records.remove(id); record = records.remove(id);
if (record == null) {
if (updateCallback != null) {
updateCallback.onUpdate(id, false);
}
if (callback != null) {
callback.done();
}
return;
}
} else {
if (!records.containsKey(id) && !compactor.containsRecord(id)) {
if (updateCallback != null) {
updateCallback.onUpdate(id, false);
}
if (callback != null) {
callback.done();
}
return;
}
} }
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id); JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
@ -1182,20 +1216,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// computing the delete should be done after compacting is done // computing the delete should be done after compacting is done
if (record == null) { if (record == null) {
// JournalImplTestUni::testDoubleDelete was written to validate this condition: // JournalImplTestUni::testDoubleDelete was written to validate this condition:
if (compactor == null) { compactor.addCommandDelete(id, usedFile);
logger.debug("Record " + id + " had been deleted already from a different call");
} else {
compactor.addCommandDelete(id, usedFile);
}
} else { } else {
record.delete(usedFile); record.delete(usedFile);
} }
result.set(true); if (updateCallback != null) {
updateCallback.onUpdate(id, true);
}
} catch (ActiveMQShutdownException e) { } catch (ActiveMQShutdownException e) {
result.fail(e); if (updateCallback != null) {
updateCallback.onUpdate(id, false);
}
logger.error("appendDeleteRecord:" + e, e); logger.error("appendDeleteRecord:" + e, e);
} catch (Throwable e) { } catch (Throwable e) {
result.fail(e); if (updateCallback != null) {
updateCallback.onUpdate(id, false);
}
logger.error("appendDeleteRecord:" + e, e); logger.error("appendDeleteRecord:" + e, e);
setErrorCondition(callback, null, e); setErrorCondition(callback, null, e);
} finally { } finally {
@ -1203,8 +1239,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
} }
}); });
result.get();
} }
private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) { private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
@ -1266,45 +1300,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
}); });
} }
private boolean checkKnownRecordID(final long id, boolean strict) throws Exception {
if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.containsRecord(id))) {
return true;
}
final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
// retry on the append thread. maybe the appender thread is not keeping up.
appendExecutor.execute(new Runnable() {
@Override
public void run() {
try {
journalLock.readLock().lock();
try {
known.set(records.containsKey(id)
|| pendingRecords.contains(id)
|| (compactor != null && compactor.containsRecord(id)));
} finally {
journalLock.readLock().unlock();
}
} catch (Throwable t) {
known.fail(t);
throw t;
}
}
});
if (!known.get()) {
if (strict) {
throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
}
return false;
} else {
return true;
}
}
private void checkJournalIsLoaded() throws Exception { private void checkJournalIsLoaded() throws Exception {
if (state != JournalState.LOADED && state != JournalState.SYNCING) { if (state != JournalState.LOADED && state != JournalState.SYNCING) {
throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]"); throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");

View File

@ -198,15 +198,15 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void storeReference(long queueID, long messageID, boolean last) throws Exception; void storeReference(long queueID, long messageID, boolean last) throws Exception;
boolean deleteMessage(long messageID) throws Exception; void deleteMessage(long messageID) throws Exception;
void storeAcknowledge(long queueID, long messageID) throws Exception; void storeAcknowledge(long queueID, long messageID) throws Exception;
void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception; void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
boolean updateDeliveryCount(MessageReference ref) throws Exception; void updateDeliveryCount(MessageReference ref) throws Exception;
boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception; void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception; void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;

View File

@ -360,7 +360,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void confirmPendingLargeMessage(long recordID) throws Exception { public void confirmPendingLargeMessage(long recordID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecord(recordID, true, getContext()); messageJournal.tryAppendDeleteRecord(recordID, true, this::messageUpdateCallback, getContext());
} }
} }
@ -385,7 +385,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception { public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext(last && syncNonTransactional)); messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, this::messageUpdateCallback, getContext(last && syncNonTransactional));
} }
} }
@ -428,7 +428,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void storeAcknowledge(final long queueID, final long messageID) throws Exception { public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional)); messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional));
} }
} }
@ -442,21 +442,35 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
} }
@Override @Override
public boolean deleteMessage(final long messageID) throws Exception { public void deleteMessage(final long messageID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
// Messages are deleted on postACK, one after another. // Messages are deleted on postACK, one after another.
// If these deletes are synchronized, we would build up messages on the Executor // If these deletes are synchronized, we would build up messages on the Executor
// increasing chances of losing deletes. // increasing chances of losing deletes.
// The StorageManager should verify messages without references // The StorageManager should verify messages without references
return messageJournal.tryAppendDeleteRecord(messageID, false, getContext(false)); messageJournal.tryAppendDeleteRecord(messageID, false, this::messageUpdateCallback, getContext(false));
}
}
private void messageUpdateCallback(long id, boolean found) {
if (!found) {
ActiveMQServerLogger.LOGGER.cannotFindMessageOnJournal(new Exception(), id);
}
}
private void recordNotFoundCallback(long id, boolean found) {
if (!found) {
if (logger.isDebugEnabled()) {
logger.debug("Record " + id + " not found");
}
} }
} }
@Override @Override
public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception { public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional)); messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, this::recordNotFoundCallback, getContext(syncNonTransactional));
} }
} }
@ -472,7 +486,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void deleteDuplicateID(final long recordID) throws Exception { public void deleteDuplicateID(final long recordID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional)); messageJournal.tryAppendDeleteRecord(recordID, syncNonTransactional, this::recordNotFoundCallback, getContext(syncNonTransactional));
} }
} }
@ -546,7 +560,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void deletePageComplete(long ackID) throws Exception { public void deletePageComplete(long ackID) throws Exception {
messageJournal.appendDeleteRecord(ackID, false); messageJournal.tryAppendDeleteRecord(ackID, this::recordNotFoundCallback, false);
} }
@Override @Override
@ -558,7 +572,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void deleteCursorAcknowledge(long ackID) throws Exception { public void deleteCursorAcknowledge(long ackID) throws Exception {
messageJournal.appendDeleteRecord(ackID, false); messageJournal.tryAppendDeleteRecord(ackID, this::recordNotFoundCallback, false);
} }
@Override @Override
@ -574,14 +588,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void deleteHeuristicCompletion(final long id) throws Exception { public void deleteHeuristicCompletion(final long id) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecord(id, true, getContext(true)); messageJournal.tryAppendDeleteRecord(id, true, this::recordNotFoundCallback, getContext(true));
} }
} }
@Override @Override
public void deletePageTransactional(final long recordID) throws Exception { public void deletePageTransactional(final long recordID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecord(recordID, false); messageJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, false);
} }
} }
@ -677,18 +691,18 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
// Other operations // Other operations
@Override @Override
public boolean updateDeliveryCount(final MessageReference ref) throws Exception { public void updateDeliveryCount(final MessageReference ref) throws Exception {
// no need to store if it's the same value // no need to store if it's the same value
// otherwise the journal will get OME in case of lots of redeliveries // otherwise the journal will get OME in case of lots of redeliveries
if (ref.getDeliveryCount() == ref.getPersistedCount()) { if (ref.getDeliveryCount() == ref.getPersistedCount()) {
return true; return;
} }
ref.setPersistedCount(ref.getDeliveryCount()); ref.setPersistedCount(ref.getDeliveryCount());
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount()); DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional)); messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional));
} }
} }
@ -741,7 +755,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
PersistedDivertConfiguration oldDivert = mapPersistedDivertConfigurations.remove(divertName); PersistedDivertConfiguration oldDivert = mapPersistedDivertConfigurations.remove(divertName);
if (oldDivert != null) { if (oldDivert != null) {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldDivert.getStoreId(), false); bindingsJournal.tryAppendDeleteRecord(oldDivert.getStoreId(), this::recordNotFoundCallback, false);
} }
} }
} }
@ -767,7 +781,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
PersistedUser oldUser = mapPersistedUsers.remove(username); PersistedUser oldUser = mapPersistedUsers.remove(username);
if (oldUser != null) { if (oldUser != null) {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldUser.getStoreId(), false); bindingsJournal.tryAppendDeleteRecord(oldUser.getStoreId(), this::recordNotFoundCallback, false);
} }
} }
} }
@ -793,7 +807,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
PersistedRole oldRole = mapPersistedRoles.remove(username); PersistedRole oldRole = mapPersistedRoles.remove(username);
if (oldRole != null) { if (oldRole != null) {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldRole.getStoreId(), false); bindingsJournal.tryAppendDeleteRecord(oldRole.getStoreId(), this::recordNotFoundCallback, false);
} }
} }
} }
@ -813,7 +827,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void deleteID(long journalD) throws Exception { public void deleteID(long journalD) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(journalD, false); bindingsJournal.tryAppendDeleteRecord(journalD, this::recordNotFoundCallback, false);
} }
} }
@ -822,7 +836,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch); PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
if (oldSetting != null) { if (oldSetting != null) {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false); bindingsJournal.tryAppendDeleteRecord(oldSetting.getStoreId(), this::recordNotFoundCallback, false);
} }
} }
} }
@ -832,7 +846,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
PersistedSecuritySetting oldRoles = mapPersistedSecuritySettings.remove(addressMatch); PersistedSecuritySetting oldRoles = mapPersistedSecuritySettings.remove(addressMatch);
if (oldRoles != null) { if (oldRoles != null) {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false); bindingsJournal.tryAppendDeleteRecord(oldRoles.getStoreId(), this::recordNotFoundCallback, false);
} }
} }
} }
@ -1094,7 +1108,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
sub.reloadACK(encoding.position); sub.reloadACK(encoding.position);
} else { } else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloading(encoding.queueID); ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloading(encoding.queueID);
messageJournal.appendDeleteRecord(record.id, false); messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
} }
@ -1111,7 +1125,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize()); sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize());
} else { } else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID()); ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false); messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
} }
break; break;
@ -1128,7 +1142,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize()); sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize());
} else { } else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID()); ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false); messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
} }
break; break;
@ -1147,11 +1161,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Complete page " + encoding.position.getPageNr() + " doesn't exist on page manager " + sub.getPagingStore().getAddress()); logger.debug("Complete page " + encoding.position.getPageNr() + " doesn't exist on page manager " + sub.getPagingStore().getAddress());
} }
messageJournal.appendDeleteRecord(record.id, false); messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
} }
} else { } else {
ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID); ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID);
messageJournal.appendDeleteRecord(record.id, false); messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
} }
break; break;
@ -1332,7 +1346,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void deleteQueueStatus(long recordID) throws Exception { public void deleteQueueStatus(long recordID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(recordID, true); bindingsJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, true);
} }
} }
@ -1350,7 +1364,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override @Override
public void deleteAddressStatus(long recordID) throws Exception { public void deleteAddressStatus(long recordID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) { try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(recordID, true); bindingsJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, true);
} }
} }

View File

@ -237,8 +237,7 @@ public class NullStorageManager implements StorageManager {
} }
@Override @Override
public boolean deleteMessage(final long messageID) throws Exception { public void deleteMessage(final long messageID) throws Exception {
return true;
} }
@Override @Override
@ -250,8 +249,7 @@ public class NullStorageManager implements StorageManager {
} }
@Override @Override
public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception { public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
return true;
} }
@Override @Override
@ -263,8 +261,7 @@ public class NullStorageManager implements StorageManager {
} }
@Override @Override
public boolean updateDeliveryCount(final MessageReference ref) throws Exception { public void updateDeliveryCount(final MessageReference ref) throws Exception {
return true;
} }
@Override @Override

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@ -238,12 +239,12 @@ public class ReplicatedJournal implements Journal {
* @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecord(long, boolean) * @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecord(long, boolean)
*/ */
@Override @Override
public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception { public void tryAppendDeleteRecord(final long id, final JournalUpdateCallback updateCallback, final boolean sync) throws Exception {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("AppendDelete " + id); log.trace("AppendDelete " + id);
} }
replicationManager.appendDeleteRecord(journalID, id); replicationManager.appendDeleteRecord(journalID, id);
return localJournal.tryAppendDeleteRecord(id, sync); localJournal.tryAppendDeleteRecord(id, updateCallback, sync);
} }
@Override @Override
@ -258,14 +259,15 @@ public class ReplicatedJournal implements Journal {
} }
@Override @Override
public boolean tryAppendDeleteRecord(final long id, public void tryAppendDeleteRecord(final long id,
final boolean sync, final boolean sync,
final JournalUpdateCallback updateCallback,
final IOCompletion completionCallback) throws Exception { final IOCompletion completionCallback) throws Exception {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("AppendDelete " + id); log.trace("AppendDelete " + id);
} }
replicationManager.appendDeleteRecord(journalID, id); replicationManager.appendDeleteRecord(journalID, id);
return localJournal.tryAppendDeleteRecord(id, sync, completionCallback); localJournal.tryAppendDeleteRecord(id, sync, updateCallback, completionCallback);
} }
/** /**
* @param txID * @param txID
@ -395,12 +397,13 @@ public class ReplicatedJournal implements Journal {
} }
@Override @Override
public boolean tryAppendUpdateRecord(final long id, public void tryAppendUpdateRecord(final long id,
final byte recordType, final byte recordType,
final byte[] record, final byte[] record,
final JournalUpdateCallback updateCallback,
final boolean sync) throws Exception { final boolean sync) throws Exception {
return this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync);
} }
/** /**
@ -425,16 +428,17 @@ public class ReplicatedJournal implements Journal {
} }
@Override @Override
public boolean tryAppendUpdateRecord(final long id, public void tryAppendUpdateRecord(final long id,
final byte recordType, final byte recordType,
final Persister persister, final Persister persister,
final Object record, final Object record,
final JournalUpdateCallback updateCallback,
final boolean sync) throws Exception { final boolean sync) throws Exception {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType); log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
} }
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record); replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
return localJournal.tryAppendUpdateRecord(id, recordType, persister, record, sync); localJournal.tryAppendUpdateRecord(id, recordType, persister, record, updateCallback, sync);
} }
@Override @Override
@ -452,17 +456,18 @@ public class ReplicatedJournal implements Journal {
} }
@Override @Override
public boolean tryAppendUpdateRecord(final long id, public void tryAppendUpdateRecord(final long id,
final byte journalRecordType, final byte journalRecordType,
final Persister persister, final Persister persister,
final Object record, final Object record,
final boolean sync, final boolean sync,
final JournalUpdateCallback updateCallback,
final IOCompletion completionCallback) throws Exception { final IOCompletion completionCallback) throws Exception {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType); log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
} }
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record); replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
return localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback); localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, updateCallback, completionCallback);
} }
/** /**

View File

@ -730,7 +730,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
*/ */
private void handleAppendDelete(final ReplicationDeleteMessage packet) throws Exception { private void handleAppendDelete(final ReplicationDeleteMessage packet) throws Exception {
Journal journalToUse = getJournal(packet.getJournalID()); Journal journalToUse = getJournal(packet.getJournalID());
journalToUse.appendDeleteRecord(packet.getId(), noSync); journalToUse.tryAppendDeleteRecord(packet.getId(), null, noSync);
} }
/** /**

View File

@ -1077,9 +1077,9 @@ public interface ActiveMQServerLogger extends BasicLogger {
void errorDecrementingRefCount(@Cause Throwable e); void errorDecrementingRefCount(@Cause Throwable e);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 222153, value = "Unable to remove message id = {0} please remove manually", @Message(id = 222153, value = "Cannot locate record for message id = {0} on Journal",
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void errorRemovingMessage(@Cause Throwable e, Long messageID); void cannotFindMessageOnJournal(@Cause Throwable e, Long messageID);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 222154, value = "Error checking DLQ", @Message(id = 222154, value = "Error checking DLQ",

View File

@ -3278,9 +3278,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) { if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
if (!storageManager.updateDeliveryCount(reference)) { storageManager.updateDeliveryCount(reference);
return new Pair<>(false, false);
}
} }
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
@ -3920,11 +3918,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// as we can't delete each messaging with sync=true while adding messages transactionally. // as we can't delete each messaging with sync=true while adding messages transactionally.
// There is a startup check to remove non referenced messages case these deletes fail // There is a startup check to remove non referenced messages case these deletes fail
try { try {
if (!storageManager.deleteMessage(message.getMessageID())) { storageManager.deleteMessage(message.getMessageID());
ActiveMQServerLogger.LOGGER.errorRemovingMessage(new Exception(), message.getMessageID());
}
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID()); ActiveMQServerLogger.LOGGER.cannotFindMessageOnJournal(e, message.getMessageID());
} }
} }
} }

View File

@ -370,8 +370,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
} }
@Override @Override
public boolean deleteMessage(long messageID) throws Exception { public void deleteMessage(long messageID) throws Exception {
return true;
} }
@Override @Override
@ -385,13 +384,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
} }
@Override @Override
public boolean updateDeliveryCount(MessageReference ref) throws Exception { public void updateDeliveryCount(MessageReference ref) throws Exception {
return true;
} }
@Override @Override
public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception { public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
return true;
} }
@Override @Override

View File

@ -441,8 +441,8 @@ public class SendAckFailTest extends SpawnedTestBase {
} }
@Override @Override
public boolean deleteMessage(long messageID) throws Exception { public void deleteMessage(long messageID) throws Exception {
return manager.deleteMessage(messageID); manager.deleteMessage(messageID);
} }
@Override @Override
@ -456,13 +456,13 @@ public class SendAckFailTest extends SpawnedTestBase {
} }
@Override @Override
public boolean updateDeliveryCount(MessageReference ref) throws Exception { public void updateDeliveryCount(MessageReference ref) throws Exception {
return manager.updateDeliveryCount(ref); manager.updateDeliveryCount(ref);
} }
@Override @Override
public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception { public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
return manager.updateScheduledDeliveryTime(ref); manager.updateScheduledDeliveryTime(ref);
} }
@Override @Override

View File

@ -95,9 +95,9 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
protected JournalStorageManager createJournalStorageManager(Configuration configuration) { protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
return new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) { return new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) {
@Override @Override
public boolean deleteMessage(final long messageID) throws Exception { public void deleteMessage(final long messageID) throws Exception {
deletedMessage.add(messageID); deletedMessage.add(messageID);
return super.deleteMessage(messageID); super.deleteMessage(messageID);
} }
}; };
} }

View File

@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -696,12 +697,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
} }
@Override @Override
public boolean tryAppendUpdateRecord(long id, public void tryAppendUpdateRecord(long id,
byte recordType, byte recordType,
Persister persister, Persister persister,
Object record, Object record, JournalUpdateCallback updateCallback,
boolean sync) throws Exception { boolean sync) throws Exception {
return true;
} }
@Override @Override
@ -715,13 +715,12 @@ public final class ReplicationTest extends ActiveMQTestBase {
} }
@Override @Override
public boolean tryAppendUpdateRecord(long id, public void tryAppendUpdateRecord(long id,
byte recordType, byte recordType,
Persister persister, Persister persister,
Object record, Object record,
boolean sync, boolean sync, JournalUpdateCallback updateCallback,
IOCompletion callback) throws Exception { IOCompletion callback) throws Exception {
return true;
} }
@Override @Override
@ -795,8 +794,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
} }
@Override @Override
public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception { public void tryAppendDeleteRecord(long id, JournalUpdateCallback updateConsumer, boolean sync) throws Exception {
return true;
} }
@Override @Override
@ -846,8 +844,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
} }
@Override @Override
public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
return true;
} }
@Override @Override
@ -951,8 +948,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
} }
@Override @Override
public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { public void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception {
return true;
} }
@Override @Override

View File

@ -39,9 +39,8 @@ public class FakeStorageManager extends NullStorageManager {
} }
@Override @Override
public boolean deleteMessage(final long messageID) throws Exception { public void deleteMessage(final long messageID) throws Exception {
messageIds.remove(messageID); messageIds.remove(messageID);
return true;
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.cli.commands.tools.journal.DecodeJournal; import org.apache.activemq.artemis.cli.commands.tools.journal.DecodeJournal;
import org.apache.activemq.artemis.cli.commands.tools.journal.EncodeJournal; import org.apache.activemq.artemis.cli.commands.tools.journal.EncodeJournal;
@ -41,6 +42,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList; import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.After; import org.junit.After;
@ -418,13 +420,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
beforeJournalOperation(); beforeJournalOperation();
boolean result = journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, sync); SimpleFutureImpl<Boolean> future = new SimpleFutureImpl();
if (result) { journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, (r, b) -> future.set(b), sync);
if (future.get()) {
Assert.fail();
records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, (short) 0)); records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, (short) 0));
} }
return result; return future.get();
} }
protected void update(final long... arguments) throws Exception { protected void update(final long... arguments) throws Exception {
@ -456,15 +461,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
protected boolean tryDelete(final long argument) throws Exception { protected boolean tryDelete(final long argument) throws Exception {
beforeJournalOperation(); beforeJournalOperation();
boolean result = journal.tryAppendDeleteRecord(argument, sync); AtomicBoolean result = new AtomicBoolean(true);
journal.tryAppendDeleteRecord(argument, (t, b) -> result.set(b), sync);
if (result) { if (result.get()) {
removeRecordsForID(argument); removeRecordsForID(argument);
} }
journal.debugWait(); journal.debugWait();
return result; return result.get();
} }
protected void addTx(final long txID, final long... arguments) throws Exception { protected void addTx(final long txID, final long... arguments) throws Exception {