diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 181a61c419..f87d7a7f3b 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -501,6 +501,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { appendRecord(r); } + @Override + public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { + appendUpdateRecord(id, recordType, record, sync); + return true; + } + @Override public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); @@ -516,6 +522,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { appendRecord(r); } + @Override + public boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { + appendUpdateRecord(id, recordType, persister, record, sync); + return true; + } + @Override public void appendUpdateRecord(long id, byte recordType, @@ -539,6 +551,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { appendRecord(r); } + + @Override + public boolean tryAppendUpdateRecord(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion completionCallback) throws Exception { + appendUpdateRecord(id, recordType, persister, record, sync, completionCallback); + return true; + } + + @Override public void appendDeleteRecord(long id, boolean sync) throws Exception { checkStatus(); @@ -553,6 +578,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { appendRecord(r); } + @Override + public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception { + appendDeleteRecord(id, sync); + return true; + } + @Override public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { checkStatus(completionCallback); @@ -569,6 +600,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { appendRecord(r); } + @Override + public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { + appendDeleteRecord(id, sync, completionCallback); + return true; + } + @Override public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { checkStatus(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index 15ba4d3a04..473ea1c3f2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -85,20 +85,36 @@ public interface Journal extends ActiveMQComponent { void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; + boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; + default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync); } + default boolean tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync); + } + void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; + boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; + default void appendUpdateRecord(long id, - byte recordType, - EncodingSupport record, - boolean sync, - IOCompletion completionCallback) throws Exception { + byte recordType, + EncodingSupport record, + boolean sync, + IOCompletion completionCallback) throws Exception { appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); } + default boolean tryAppendUpdateRecord(long id, + byte recordType, + EncodingSupport record, + boolean sync, + IOCompletion completionCallback) throws Exception { + return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); + } + void appendUpdateRecord(long id, byte recordType, Persister persister, @@ -106,10 +122,21 @@ public interface Journal extends ActiveMQComponent { boolean sync, IOCompletion callback) throws Exception; + boolean tryAppendUpdateRecord(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion callback) throws Exception; + void appendDeleteRecord(long id, boolean sync) throws Exception; + boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception; + void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception; + boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception; + // Transactional operations void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 85f55dc774..c76ed68d32 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -172,6 +172,13 @@ public final class FileWrapperJournal extends JournalBase { writeRecord(deleteRecord, false, -1, false, callback); } + + @Override + public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception { + appendDeleteRecord(id, sync, callback); + return true; + } + @Override public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception { JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record); @@ -199,6 +206,18 @@ public final class FileWrapperJournal extends JournalBase { writeRecord(updateRecord, false, -1, false, callback); } + @Override + public boolean tryAppendUpdateRecord(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion callback) throws Exception { + JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); + writeRecord(updateRecord, false, -1, false, callback); + return true; + } + @Override public void appendUpdateRecordTransactional(long txID, long id, diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java index 2c03f928b2..8c7a89b56b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java @@ -71,12 +71,20 @@ abstract class JournalBase implements Journal { @Override public void appendUpdateRecord(final long id, - final byte recordType, - final byte[] record, - final boolean sync) throws Exception { + final byte recordType, + final byte[] record, + final boolean sync) throws Exception { appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); } + @Override + public boolean tryAppendUpdateRecord(final long id, + final byte recordType, + final byte[] record, + final boolean sync) throws Exception { + return tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); + } + @Override public void appendUpdateRecordTransactional(final long txID, final long id, @@ -136,6 +144,23 @@ abstract class JournalBase implements Journal { } } + @Override + public boolean tryAppendUpdateRecord(final long id, + final byte recordType, + final Persister persister, + final Object record, + final boolean sync) throws Exception { + SyncIOCompletion callback = getSyncCallback(sync); + + boolean append = tryAppendUpdateRecord(id, recordType, persister, record, sync, callback); + + if (callback != null) { + callback.waitCompletion(); + } + + return append; + } + @Override public void appendRollbackRecord(final long txID, final boolean sync) throws Exception { SyncIOCompletion syncCompletion = getSyncCallback(sync); @@ -159,6 +184,18 @@ abstract class JournalBase implements Journal { } } + @Override + public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception { + SyncIOCompletion callback = getSyncCallback(sync); + + boolean result = tryAppendDeleteRecord(id, sync, callback); + + if (callback != null) { + callback.waitCompletion(); + } + + return result; + } abstract void scheduleReclaim(); protected SyncIOCompletion getSyncCallback(final boolean sync) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 11617564e7..d5d4a81e6f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -884,7 +884,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final IOCompletion callback) throws Exception { checkJournalIsLoaded(); lineUpContext(callback); - checkKnownRecordID(id); + checkKnownRecordID(id, true); if (logger.isTraceEnabled()) { logger.trace("scheduling appendUpdateRecord::id=" + id + @@ -892,8 +892,47 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal recordType); } - final SimpleFuture result = newSyncAndCallbackResult(sync, callback); + internalAppendUpdateRecord(id, recordType, persister, record, sync, callback); + } + + @Override + public boolean tryAppendUpdateRecord(final long id, + final byte recordType, + final Persister persister, + final Object record, + final boolean sync, + final IOCompletion callback) throws Exception { + checkJournalIsLoaded(); + lineUpContext(callback); + + if (!checkKnownRecordID(id, false)) { + if (callback != null) { + callback.done(); + } + return false; + } + + if (logger.isTraceEnabled()) { + logger.trace("scheduling appendUpdateRecord::id=" + id + + ", userRecordType=" + + recordType); + } + + + internalAppendUpdateRecord(id, recordType, persister, record, sync, callback); + + return true; + } + + + private void internalAppendUpdateRecord(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { + final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @Override public void run() { @@ -946,8 +985,37 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal checkJournalIsLoaded(); lineUpContext(callback); - checkKnownRecordID(id); + checkKnownRecordID(id, true); + internalAppendDeleteRecord(id, sync, callback); + return; + } + + + @Override + public boolean tryAppendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception { + + if (logger.isTraceEnabled()) { + logger.trace("scheduling appendDeleteRecord::id=" + id); + } + + + checkJournalIsLoaded(); + lineUpContext(callback); + if (!checkKnownRecordID(id, false)) { + if (callback != null) { + callback.done(); + } + return false; + } + + internalAppendDeleteRecord(id, sync, callback); + return true; + } + + private void internalAppendDeleteRecord(long id, + boolean sync, + IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @Override @@ -1055,9 +1123,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal }); } - private void checkKnownRecordID(final long id) throws Exception { + private boolean checkKnownRecordID(final long id, boolean strict) throws Exception { if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.containsRecord(id))) { - return; + return true; } final SimpleFuture known = new SimpleFutureImpl<>(); @@ -1079,7 +1147,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal }); if (!known.get()) { - throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records"); + if (strict) { + throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records"); + } + return false; + } else { + return true; } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 4509210806..3a106cca1a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1048,6 +1048,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void removeDestination(ActiveMQDestination dest) throws Exception { if (dest.isQueue()) { + + if (!dest.isTemporary()) { + // this should not really happen, + // so I'm not creating a Logger for this + logger.warn("OpenWire client sending a queue remove towards " + dest.getPhysicalName()); + } try { server.destroyQueue(new SimpleString(dest.getPhysicalName()), getRemotingConnection()); } catch (ActiveMQNonExistentQueueException neq) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 39183cde1f..5bb91c0115 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -194,15 +194,15 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void storeReference(long queueID, long messageID, boolean last) throws Exception; - void deleteMessage(long messageID) throws Exception; + boolean deleteMessage(long messageID) throws Exception; void storeAcknowledge(long queueID, long messageID) throws Exception; void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception; - void updateDeliveryCount(MessageReference ref) throws Exception; + boolean updateDeliveryCount(MessageReference ref) throws Exception; - void updateScheduledDeliveryTime(MessageReference ref) throws Exception; + boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception; void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index fdb1e8cfd4..33fc7197ab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -425,25 +425,25 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } @Override - public void deleteMessage(final long messageID) throws Exception { + public boolean deleteMessage(final long messageID) throws Exception { readLock(); try { // Messages are deleted on postACK, one after another. // If these deletes are synchronized, we would build up messages on the Executor // increasing chances of losing deletes. // The StorageManager should verify messages without references - messageJournal.appendDeleteRecord(messageID, false, getContext(false)); + return messageJournal.tryAppendDeleteRecord(messageID, false, getContext(false)); } finally { readUnLock(); } } @Override - public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception { + public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception { ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); readLock(); try { - messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional)); + return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional)); } finally { readUnLock(); } @@ -725,11 +725,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp // Other operations @Override - public void updateDeliveryCount(final MessageReference ref) throws Exception { + public boolean updateDeliveryCount(final MessageReference ref) throws Exception { // no need to store if it's the same value // otherwise the journal will get OME in case of lots of redeliveries if (ref.getDeliveryCount() == ref.getPersistedCount()) { - return; + return true; } ref.setPersistedCount(ref.getDeliveryCount()); @@ -737,7 +737,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp readLock(); try { - messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional)); + return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional)); } finally { readUnLock(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 72953fa0ee..a4a3f8ab98 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -226,7 +226,8 @@ public class NullStorageManager implements StorageManager { } @Override - public void deleteMessage(final long messageID) throws Exception { + public boolean deleteMessage(final long messageID) throws Exception { + return true; } @Override @@ -238,7 +239,8 @@ public class NullStorageManager implements StorageManager { } @Override - public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception { + public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception { + return true; } @Override @@ -250,7 +252,8 @@ public class NullStorageManager implements StorageManager { } @Override - public void updateDeliveryCount(final MessageReference ref) throws Exception { + public boolean updateDeliveryCount(final MessageReference ref) throws Exception { + return true; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index f81a31bf19..e66e9b3ad4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -207,6 +207,21 @@ public class ReplicatedJournal implements Journal { localJournal.appendDeleteRecord(id, sync); } + /** + * @param id + * @param sync + * @throws Exception + * @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecord(long, boolean) + */ + @Override + public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception { + if (log.isTraceEnabled()) { + log.trace("AppendDelete " + id); + } + replicationManager.appendDeleteRecord(journalID, id); + return localJournal.tryAppendDeleteRecord(id, sync); + } + @Override public void appendDeleteRecord(final long id, final boolean sync, @@ -218,6 +233,16 @@ public class ReplicatedJournal implements Journal { localJournal.appendDeleteRecord(id, sync, completionCallback); } + @Override + public boolean tryAppendDeleteRecord(final long id, + final boolean sync, + final IOCompletion completionCallback) throws Exception { + if (log.isTraceEnabled()) { + log.trace("AppendDelete " + id); + } + replicationManager.appendDeleteRecord(journalID, id); + return localJournal.tryAppendDeleteRecord(id, sync, completionCallback); + } /** * @param txID * @param id @@ -345,6 +370,15 @@ public class ReplicatedJournal implements Journal { this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); } + @Override + public boolean tryAppendUpdateRecord(final long id, + final byte recordType, + final byte[] record, + final boolean sync) throws Exception { + + return this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); + } + /** * @param id * @param recordType @@ -366,6 +400,19 @@ public class ReplicatedJournal implements Journal { localJournal.appendUpdateRecord(id, recordType, persister, record, sync); } + @Override + public boolean tryAppendUpdateRecord(final long id, + final byte recordType, + final Persister persister, + final Object record, + final boolean sync) throws Exception { + if (log.isTraceEnabled()) { + log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType); + } + replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record); + return localJournal.tryAppendUpdateRecord(id, recordType, persister, record, sync); + } + @Override public void appendUpdateRecord(final long id, final byte journalRecordType, @@ -380,6 +427,20 @@ public class ReplicatedJournal implements Journal { localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback); } + @Override + public boolean tryAppendUpdateRecord(final long id, + final byte journalRecordType, + final Persister persister, + final Object record, + final boolean sync, + final IOCompletion completionCallback) throws Exception { + if (log.isTraceEnabled()) { + log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType); + } + replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record); + return localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback); + } + /** * @param txID * @param id diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 7d798b74ad..d7c93adbfa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3093,7 +3093,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public Pair checkRedelivery(final MessageReference reference, final long timeBase, final boolean ignoreRedeliveryDelay) throws Exception { - Message message = reference.getMessage(); if (internalQueue) { if (logger.isTraceEnabled()) { @@ -3104,7 +3103,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) { - storageManager.updateDeliveryCount(reference); + if (!storageManager.updateDeliveryCount(reference)) { + return new Pair<>(false, false); + } } AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); @@ -3739,7 +3740,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // as we can't delete each messaging with sync=true while adding messages transactionally. // There is a startup check to remove non referenced messages case these deletes fail try { - storageManager.deleteMessage(message.getMessageID()); + if (!storageManager.deleteMessage(message.getMessageID())) { + ActiveMQServerLogger.LOGGER.errorRemovingMessage(new Exception(), message.getMessageID()); + } } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID()); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 471885c079..f740ba855a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -354,8 +354,8 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public void deleteMessage(long messageID) throws Exception { - + public boolean deleteMessage(long messageID) throws Exception { + return true; } @Override @@ -369,13 +369,13 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override - public void updateDeliveryCount(MessageReference ref) throws Exception { - + public boolean updateDeliveryCount(MessageReference ref) throws Exception { + return true; } @Override - public void updateScheduledDeliveryTime(MessageReference ref) throws Exception { - + public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception { + return true; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ForceDeleteQueue.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ForceDeleteQueue.java new file mode 100644 index 0000000000..663e3f12e9 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ForceDeleteQueue.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ForceDeleteQueue extends ActiveMQTestBase { + + ActiveMQServer server; + String protocol = "openwire"; + String uri = "tcp://localhost:61616"; + + public ForceDeleteQueue(String protocol) { + this.protocol = protocol; + } + + @Parameterized.Parameters(name = "protocol={0}") + public static Collection data() { + Object[][] params = new Object[][]{{"openwire"}, {"core"}, {"amqp"}}; + return Arrays.asList(params); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + if (protocol.equals("openwire")) { + uri = "tcp://localhost:61616?jms.prefetchPolicy.all=5000"; + } + + server = createServer(true, true); + server.getAddressSettingsRepository().addMatch("#", + new AddressSettings().setMaxDeliveryAttempts(2)); + + server.start(); + } + + @Test + public void testForceDelete() throws Exception { + SimpleString queueName = SimpleString.toSimpleString("testForceDelete"); + server.addAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST)); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, uri); + Connection conn = factory.createConnection(); + + AssertionLoggerHandler.startCapture(); + try { + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName.toString()); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 1000; i++) { + TextMessage message = session.createTextMessage("Text " + i); + producer.send(message); + } + session.commit(); + + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName); + + Wait.assertEquals(1000, serverQueue::getMessageCount); + + conn.close(); + + conn = factory.createConnection(); + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + LinkedListIterator queueiterator = serverQueue.browserIterator(); + ArrayList listQueue = new ArrayList<>(1000); + + while (queueiterator.hasNext()) { + MessageReference ref = queueiterator.next(); + + listQueue.add(ref.getMessageID()); + } + + queueiterator.close(); + + MessageConsumer consumer = session.createConsumer(queue); + + Wait.assertTrue(() -> serverQueue.getDeliveringCount() > 100); + + for (Long l : listQueue) { + // this is forcing an artificial situation where the message was removed during a failure condition + server.getStorageManager().deleteMessage(l); + } + + server.destroyQueue(queueName, null, false); + + for (RemotingConnection connection : server.getRemotingService().getConnections()) { + connection.fail(new ActiveMQException("failure")); + } + + + Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find add info")); + + + } finally { + AssertionLoggerHandler.stopCapture(); + try { + conn.close(); + } catch (Throwable ignored) { + } + } + + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index f2f63f5242..3980b3bb71 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -433,8 +433,8 @@ public class SendAckFailTest extends SpawnedTestBase { } @Override - public void deleteMessage(long messageID) throws Exception { - manager.deleteMessage(messageID); + public boolean deleteMessage(long messageID) throws Exception { + return manager.deleteMessage(messageID); } @Override @@ -448,13 +448,13 @@ public class SendAckFailTest extends SpawnedTestBase { } @Override - public void updateDeliveryCount(MessageReference ref) throws Exception { - manager.updateDeliveryCount(ref); + public boolean updateDeliveryCount(MessageReference ref) throws Exception { + return manager.updateDeliveryCount(ref); } @Override - public void updateScheduledDeliveryTime(MessageReference ref) throws Exception { - manager.updateScheduledDeliveryTime(ref); + public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception { + return manager.updateScheduledDeliveryTime(ref); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java index e69d4d9b33..3fc2a05561 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java @@ -95,9 +95,9 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase { protected JournalStorageManager createJournalStorageManager(Configuration configuration) { return new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) { @Override - public void deleteMessage(final long messageID) throws Exception { + public boolean deleteMessage(final long messageID) throws Exception { deletedMessage.add(messageID); - super.deleteMessage(messageID); + return super.deleteMessage(messageID); } }; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 82165f0644..ce56b9beed 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -649,6 +649,15 @@ public final class ReplicationTest extends ActiveMQTestBase { } + @Override + public boolean tryAppendUpdateRecord(long id, + byte recordType, + Persister persister, + Object record, + boolean sync) throws Exception { + return true; + } + @Override public void appendUpdateRecord(long id, byte recordType, @@ -659,6 +668,16 @@ public final class ReplicationTest extends ActiveMQTestBase { } + @Override + public boolean tryAppendUpdateRecord(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion callback) throws Exception { + return true; + } + @Override public void appendAddRecordTransactional(long txID, long id, @@ -729,6 +748,11 @@ public final class ReplicationTest extends ActiveMQTestBase { } + @Override + public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception { + return true; + } + @Override public void appendDeleteRecordTransactional(final long txID, final long id, @@ -775,6 +799,11 @@ public final class ReplicationTest extends ActiveMQTestBase { } + @Override + public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { + return true; + } + @Override public void appendUpdateRecord(final long id, final byte recordType, @@ -875,6 +904,11 @@ public final class ReplicationTest extends ActiveMQTestBase { final IOCompletion completionCallback) throws Exception { } + @Override + public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { + return true; + } + @Override public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java index 67cfe18f41..c38bc453c5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java @@ -39,8 +39,9 @@ public class FakeStorageManager extends NullStorageManager { } @Override - public void deleteMessage(final long messageID) throws Exception { + public boolean deleteMessage(final long messageID) throws Exception { messageIds.remove(messageID); + return true; } @Override diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml index 4a9eb47619..eb226e6005 100644 --- a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml @@ -117,6 +117,11 @@ under the License. +
+ + + +
diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml index 58a4b9fd74..4d62a6ac71 100644 --- a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml @@ -119,6 +119,11 @@ under the License.
+
+ + + +
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java index 954e85b3cf..7da070c1f4 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java @@ -41,6 +41,10 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) public class SoakPagingTest extends SmokeTestBase { + public static final int LAG_CONSUMER_TIME = 1000; + public static final int TIME_RUNNING = 4000; + public static final int CLIENT_KILLS = 2; + String protocol; String consumerType; boolean transaction; @@ -86,12 +90,13 @@ public class SoakPagingTest extends SmokeTestBase { private static ConnectionFactory createConnectionFactory(String protocol, String uri) { if (protocol.toUpperCase().equals("OPENWIRE")) { - return new org.apache.activemq.ActiveMQConnectionFactory(uri); + return new org.apache.activemq.ActiveMQConnectionFactory("failover:(" + uri + ")"); } else if (protocol.toUpperCase().equals("AMQP")) { if (uri.startsWith("tcp://")) { // replacing tcp:// by amqp:// uri = "amqp" + uri.substring(3); + } return new JmsConnectionFactory(uri); } else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) { @@ -158,25 +163,14 @@ public class SoakPagingTest extends SmokeTestBase { @Test public void testPagingReplication() throws Throwable { - Process queueProcess = null; - if (consumerType.equals("queue")) { - queueProcess = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "45000", "" + transaction); - } + server1 = startServer(SERVER_NAME_1, 0, 30000); - for (int i = 0; i < 3; i++) { - Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "15000", "" + transaction); - - if (i == 0) { - server1 = startServer(SERVER_NAME_1, 0, 30000); - } + for (int i = 0; i < CLIENT_KILLS; i++) { + Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "" + TIME_RUNNING, "" + transaction); int result = process.waitFor(); Assert.assertTrue(result > 0); } - - if (queueProcess != null) { - Assert.assertTrue(queueProcess.waitFor() > 0); - } } public void produce(ConnectionFactory factory) { @@ -261,7 +255,8 @@ public class SoakPagingTest extends SmokeTestBase { messageConsumer = session.createConsumer(address); } - Thread.sleep(5000); + if (LAG_CONSUMER_TIME > 0) Thread.sleep(LAG_CONSUMER_TIME); + connection.start(); int i = 0; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java index be72ee6060..d8dc8fcfad 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java @@ -385,6 +385,20 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { journal.debugWait(); } + protected boolean tryUpdate(final long argument) throws Exception { + byte[] updateRecord = generateRecord(recordLength); + + beforeJournalOperation(); + + boolean result = journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, sync); + + if (result) { + records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, (short) 0)); + } + + return result; + } + protected void update(final long... arguments) throws Exception { for (long element : arguments) { byte[] updateRecord = generateRecord(recordLength); @@ -411,6 +425,20 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { journal.debugWait(); } + protected boolean tryDelete(final long argument) throws Exception { + beforeJournalOperation(); + + boolean result = journal.tryAppendDeleteRecord(argument, sync); + + if (result) { + removeRecordsForID(argument); + } + + journal.debugWait(); + + return result; + } + protected void addTx(final long txID, final long... arguments) throws Exception { TransactionHolder tx = getTransaction(txID); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index 25b2413845..838f334231 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -2688,6 +2688,22 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { loadAndCheck(); } + @Test + public void testTryIsolation2() throws Exception { + setup(10, 10 * 1024, true); + createJournal(); + startJournal(); + load(); + addTx(1, 1, 2, 3); + + Assert.assertFalse(tryUpdate(1)); + + stopJournal(); + createJournal(); + startJournal(); + loadAndCheck(); + } + @Test public void testIsolation3() throws Exception { setup(10, 10 * 1024, true); @@ -2708,6 +2724,22 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { loadAndCheck(); } + @Test + public void testTryDelete() throws Exception { + setup(10, 10 * 1024, true); + createJournal(); + startJournal(); + load(); + addTx(1, 1, 2, 3); + + Assert.assertFalse(tryDelete(1)); + + stopJournal(); + createJournal(); + startJournal(); + loadAndCheck(); + } + // XA tests // ========