This closes #3067
This commit is contained in:
commit
b2342b626e
|
@ -501,6 +501,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
|
||||||
|
appendUpdateRecord(id, recordType, record, sync);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
|
public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
|
||||||
|
@ -516,6 +522,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
|
||||||
|
appendUpdateRecord(id, recordType, persister, record, sync);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecord(long id,
|
public void appendUpdateRecord(long id,
|
||||||
byte recordType,
|
byte recordType,
|
||||||
|
@ -539,6 +551,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(long id,
|
||||||
|
byte recordType,
|
||||||
|
Persister persister,
|
||||||
|
Object record,
|
||||||
|
boolean sync,
|
||||||
|
IOCompletion completionCallback) throws Exception {
|
||||||
|
appendUpdateRecord(id, recordType, persister, record, sync, completionCallback);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecord(long id, boolean sync) throws Exception {
|
public void appendDeleteRecord(long id, boolean sync) throws Exception {
|
||||||
checkStatus();
|
checkStatus();
|
||||||
|
@ -553,6 +578,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception {
|
||||||
|
appendDeleteRecord(id, sync);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
|
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
|
||||||
checkStatus(completionCallback);
|
checkStatus(completionCallback);
|
||||||
|
@ -569,6 +600,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
|
||||||
|
appendDeleteRecord(id, sync, completionCallback);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
|
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
|
||||||
checkStatus();
|
checkStatus();
|
||||||
|
|
|
@ -85,20 +85,36 @@ public interface Journal extends ActiveMQComponent {
|
||||||
|
|
||||||
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
|
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
|
||||||
|
|
||||||
|
boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
|
||||||
|
|
||||||
default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
|
default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
|
||||||
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
|
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default boolean tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
|
||||||
|
return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
|
||||||
|
}
|
||||||
|
|
||||||
void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
|
void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
|
||||||
|
|
||||||
|
boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
|
||||||
|
|
||||||
default void appendUpdateRecord(long id,
|
default void appendUpdateRecord(long id,
|
||||||
byte recordType,
|
byte recordType,
|
||||||
EncodingSupport record,
|
EncodingSupport record,
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion completionCallback) throws Exception {
|
IOCompletion completionCallback) throws Exception {
|
||||||
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
|
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default boolean tryAppendUpdateRecord(long id,
|
||||||
|
byte recordType,
|
||||||
|
EncodingSupport record,
|
||||||
|
boolean sync,
|
||||||
|
IOCompletion completionCallback) throws Exception {
|
||||||
|
return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
|
||||||
|
}
|
||||||
|
|
||||||
void appendUpdateRecord(long id,
|
void appendUpdateRecord(long id,
|
||||||
byte recordType,
|
byte recordType,
|
||||||
Persister persister,
|
Persister persister,
|
||||||
|
@ -106,10 +122,21 @@ public interface Journal extends ActiveMQComponent {
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion callback) throws Exception;
|
IOCompletion callback) throws Exception;
|
||||||
|
|
||||||
|
boolean tryAppendUpdateRecord(long id,
|
||||||
|
byte recordType,
|
||||||
|
Persister persister,
|
||||||
|
Object record,
|
||||||
|
boolean sync,
|
||||||
|
IOCompletion callback) throws Exception;
|
||||||
|
|
||||||
void appendDeleteRecord(long id, boolean sync) throws Exception;
|
void appendDeleteRecord(long id, boolean sync) throws Exception;
|
||||||
|
|
||||||
|
boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception;
|
||||||
|
|
||||||
void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
|
void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
|
||||||
|
|
||||||
|
boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
|
||||||
|
|
||||||
// Transactional operations
|
// Transactional operations
|
||||||
|
|
||||||
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
|
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
|
||||||
|
|
|
@ -172,6 +172,13 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
writeRecord(deleteRecord, false, -1, false, callback);
|
writeRecord(deleteRecord, false, -1, false, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception {
|
||||||
|
appendDeleteRecord(id, sync, callback);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
|
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
|
||||||
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
|
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
|
||||||
|
@ -199,6 +206,18 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
writeRecord(updateRecord, false, -1, false, callback);
|
writeRecord(updateRecord, false, -1, false, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(long id,
|
||||||
|
byte recordType,
|
||||||
|
Persister persister,
|
||||||
|
Object record,
|
||||||
|
boolean sync,
|
||||||
|
IOCompletion callback) throws Exception {
|
||||||
|
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
|
||||||
|
writeRecord(updateRecord, false, -1, false, callback);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecordTransactional(long txID,
|
public void appendUpdateRecordTransactional(long txID,
|
||||||
long id,
|
long id,
|
||||||
|
|
|
@ -71,12 +71,20 @@ abstract class JournalBase implements Journal {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecord(final long id,
|
public void appendUpdateRecord(final long id,
|
||||||
final byte recordType,
|
final byte recordType,
|
||||||
final byte[] record,
|
final byte[] record,
|
||||||
final boolean sync) throws Exception {
|
final boolean sync) throws Exception {
|
||||||
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
|
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(final long id,
|
||||||
|
final byte recordType,
|
||||||
|
final byte[] record,
|
||||||
|
final boolean sync) throws Exception {
|
||||||
|
return tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecordTransactional(final long txID,
|
public void appendUpdateRecordTransactional(final long txID,
|
||||||
final long id,
|
final long id,
|
||||||
|
@ -136,6 +144,23 @@ abstract class JournalBase implements Journal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(final long id,
|
||||||
|
final byte recordType,
|
||||||
|
final Persister persister,
|
||||||
|
final Object record,
|
||||||
|
final boolean sync) throws Exception {
|
||||||
|
SyncIOCompletion callback = getSyncCallback(sync);
|
||||||
|
|
||||||
|
boolean append = tryAppendUpdateRecord(id, recordType, persister, record, sync, callback);
|
||||||
|
|
||||||
|
if (callback != null) {
|
||||||
|
callback.waitCompletion();
|
||||||
|
}
|
||||||
|
|
||||||
|
return append;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendRollbackRecord(final long txID, final boolean sync) throws Exception {
|
public void appendRollbackRecord(final long txID, final boolean sync) throws Exception {
|
||||||
SyncIOCompletion syncCompletion = getSyncCallback(sync);
|
SyncIOCompletion syncCompletion = getSyncCallback(sync);
|
||||||
|
@ -159,6 +184,18 @@ abstract class JournalBase implements Journal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception {
|
||||||
|
SyncIOCompletion callback = getSyncCallback(sync);
|
||||||
|
|
||||||
|
boolean result = tryAppendDeleteRecord(id, sync, callback);
|
||||||
|
|
||||||
|
if (callback != null) {
|
||||||
|
callback.waitCompletion();
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
abstract void scheduleReclaim();
|
abstract void scheduleReclaim();
|
||||||
|
|
||||||
protected SyncIOCompletion getSyncCallback(final boolean sync) {
|
protected SyncIOCompletion getSyncCallback(final boolean sync) {
|
||||||
|
|
|
@ -884,7 +884,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
final IOCompletion callback) throws Exception {
|
final IOCompletion callback) throws Exception {
|
||||||
checkJournalIsLoaded();
|
checkJournalIsLoaded();
|
||||||
lineUpContext(callback);
|
lineUpContext(callback);
|
||||||
checkKnownRecordID(id);
|
checkKnownRecordID(id, true);
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("scheduling appendUpdateRecord::id=" + id +
|
logger.trace("scheduling appendUpdateRecord::id=" + id +
|
||||||
|
@ -892,8 +892,47 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
recordType);
|
recordType);
|
||||||
}
|
}
|
||||||
|
|
||||||
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
|
internalAppendUpdateRecord(id, recordType, persister, record, sync, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(final long id,
|
||||||
|
final byte recordType,
|
||||||
|
final Persister persister,
|
||||||
|
final Object record,
|
||||||
|
final boolean sync,
|
||||||
|
final IOCompletion callback) throws Exception {
|
||||||
|
checkJournalIsLoaded();
|
||||||
|
lineUpContext(callback);
|
||||||
|
|
||||||
|
if (!checkKnownRecordID(id, false)) {
|
||||||
|
if (callback != null) {
|
||||||
|
callback.done();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("scheduling appendUpdateRecord::id=" + id +
|
||||||
|
", userRecordType=" +
|
||||||
|
recordType);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
internalAppendUpdateRecord(id, recordType, persister, record, sync, callback);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void internalAppendUpdateRecord(long id,
|
||||||
|
byte recordType,
|
||||||
|
Persister persister,
|
||||||
|
Object record,
|
||||||
|
boolean sync,
|
||||||
|
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
|
||||||
|
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
|
||||||
appendExecutor.execute(new Runnable() {
|
appendExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -946,8 +985,37 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
|
|
||||||
checkJournalIsLoaded();
|
checkJournalIsLoaded();
|
||||||
lineUpContext(callback);
|
lineUpContext(callback);
|
||||||
checkKnownRecordID(id);
|
checkKnownRecordID(id, true);
|
||||||
|
|
||||||
|
internalAppendDeleteRecord(id, sync, callback);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("scheduling appendDeleteRecord::id=" + id);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
checkJournalIsLoaded();
|
||||||
|
lineUpContext(callback);
|
||||||
|
if (!checkKnownRecordID(id, false)) {
|
||||||
|
if (callback != null) {
|
||||||
|
callback.done();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
internalAppendDeleteRecord(id, sync, callback);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internalAppendDeleteRecord(long id,
|
||||||
|
boolean sync,
|
||||||
|
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
|
||||||
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
|
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
|
||||||
appendExecutor.execute(new Runnable() {
|
appendExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1055,9 +1123,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkKnownRecordID(final long id) throws Exception {
|
private boolean checkKnownRecordID(final long id, boolean strict) throws Exception {
|
||||||
if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.containsRecord(id))) {
|
if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.containsRecord(id))) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
|
final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
|
||||||
|
@ -1079,7 +1147,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!known.get()) {
|
if (!known.get()) {
|
||||||
throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
|
if (strict) {
|
||||||
|
throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1048,6 +1048,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
|
|
||||||
public void removeDestination(ActiveMQDestination dest) throws Exception {
|
public void removeDestination(ActiveMQDestination dest) throws Exception {
|
||||||
if (dest.isQueue()) {
|
if (dest.isQueue()) {
|
||||||
|
|
||||||
|
if (!dest.isTemporary()) {
|
||||||
|
// this should not really happen,
|
||||||
|
// so I'm not creating a Logger for this
|
||||||
|
logger.warn("OpenWire client sending a queue remove towards " + dest.getPhysicalName());
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
server.destroyQueue(new SimpleString(dest.getPhysicalName()), getRemotingConnection());
|
server.destroyQueue(new SimpleString(dest.getPhysicalName()), getRemotingConnection());
|
||||||
} catch (ActiveMQNonExistentQueueException neq) {
|
} catch (ActiveMQNonExistentQueueException neq) {
|
||||||
|
|
|
@ -194,15 +194,15 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
||||||
|
|
||||||
void storeReference(long queueID, long messageID, boolean last) throws Exception;
|
void storeReference(long queueID, long messageID, boolean last) throws Exception;
|
||||||
|
|
||||||
void deleteMessage(long messageID) throws Exception;
|
boolean deleteMessage(long messageID) throws Exception;
|
||||||
|
|
||||||
void storeAcknowledge(long queueID, long messageID) throws Exception;
|
void storeAcknowledge(long queueID, long messageID) throws Exception;
|
||||||
|
|
||||||
void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
|
void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
|
||||||
|
|
||||||
void updateDeliveryCount(MessageReference ref) throws Exception;
|
boolean updateDeliveryCount(MessageReference ref) throws Exception;
|
||||||
|
|
||||||
void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
|
boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception;
|
||||||
|
|
||||||
void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;
|
void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;
|
||||||
|
|
||||||
|
|
|
@ -425,25 +425,25 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteMessage(final long messageID) throws Exception {
|
public boolean deleteMessage(final long messageID) throws Exception {
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
// Messages are deleted on postACK, one after another.
|
// Messages are deleted on postACK, one after another.
|
||||||
// If these deletes are synchronized, we would build up messages on the Executor
|
// If these deletes are synchronized, we would build up messages on the Executor
|
||||||
// increasing chances of losing deletes.
|
// increasing chances of losing deletes.
|
||||||
// The StorageManager should verify messages without references
|
// The StorageManager should verify messages without references
|
||||||
messageJournal.appendDeleteRecord(messageID, false, getContext(false));
|
return messageJournal.tryAppendDeleteRecord(messageID, false, getContext(false));
|
||||||
} finally {
|
} finally {
|
||||||
readUnLock();
|
readUnLock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
|
public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
|
||||||
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
|
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional));
|
return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional));
|
||||||
} finally {
|
} finally {
|
||||||
readUnLock();
|
readUnLock();
|
||||||
}
|
}
|
||||||
|
@ -725,11 +725,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
// Other operations
|
// Other operations
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateDeliveryCount(final MessageReference ref) throws Exception {
|
public boolean updateDeliveryCount(final MessageReference ref) throws Exception {
|
||||||
// no need to store if it's the same value
|
// no need to store if it's the same value
|
||||||
// otherwise the journal will get OME in case of lots of redeliveries
|
// otherwise the journal will get OME in case of lots of redeliveries
|
||||||
if (ref.getDeliveryCount() == ref.getPersistedCount()) {
|
if (ref.getDeliveryCount() == ref.getPersistedCount()) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
ref.setPersistedCount(ref.getDeliveryCount());
|
ref.setPersistedCount(ref.getDeliveryCount());
|
||||||
|
@ -737,7 +737,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
|
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
|
return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
|
||||||
} finally {
|
} finally {
|
||||||
readUnLock();
|
readUnLock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -226,7 +226,8 @@ public class NullStorageManager implements StorageManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteMessage(final long messageID) throws Exception {
|
public boolean deleteMessage(final long messageID) throws Exception {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -238,7 +239,8 @@ public class NullStorageManager implements StorageManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
|
public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -250,7 +252,8 @@ public class NullStorageManager implements StorageManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateDeliveryCount(final MessageReference ref) throws Exception {
|
public boolean updateDeliveryCount(final MessageReference ref) throws Exception {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -207,6 +207,21 @@ public class ReplicatedJournal implements Journal {
|
||||||
localJournal.appendDeleteRecord(id, sync);
|
localJournal.appendDeleteRecord(id, sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param id
|
||||||
|
* @param sync
|
||||||
|
* @throws Exception
|
||||||
|
* @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecord(long, boolean)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception {
|
||||||
|
if (log.isTraceEnabled()) {
|
||||||
|
log.trace("AppendDelete " + id);
|
||||||
|
}
|
||||||
|
replicationManager.appendDeleteRecord(journalID, id);
|
||||||
|
return localJournal.tryAppendDeleteRecord(id, sync);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecord(final long id,
|
public void appendDeleteRecord(final long id,
|
||||||
final boolean sync,
|
final boolean sync,
|
||||||
|
@ -218,6 +233,16 @@ public class ReplicatedJournal implements Journal {
|
||||||
localJournal.appendDeleteRecord(id, sync, completionCallback);
|
localJournal.appendDeleteRecord(id, sync, completionCallback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendDeleteRecord(final long id,
|
||||||
|
final boolean sync,
|
||||||
|
final IOCompletion completionCallback) throws Exception {
|
||||||
|
if (log.isTraceEnabled()) {
|
||||||
|
log.trace("AppendDelete " + id);
|
||||||
|
}
|
||||||
|
replicationManager.appendDeleteRecord(journalID, id);
|
||||||
|
return localJournal.tryAppendDeleteRecord(id, sync, completionCallback);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* @param txID
|
* @param txID
|
||||||
* @param id
|
* @param id
|
||||||
|
@ -345,6 +370,15 @@ public class ReplicatedJournal implements Journal {
|
||||||
this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
|
this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(final long id,
|
||||||
|
final byte recordType,
|
||||||
|
final byte[] record,
|
||||||
|
final boolean sync) throws Exception {
|
||||||
|
|
||||||
|
return this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param id
|
* @param id
|
||||||
* @param recordType
|
* @param recordType
|
||||||
|
@ -366,6 +400,19 @@ public class ReplicatedJournal implements Journal {
|
||||||
localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
|
localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(final long id,
|
||||||
|
final byte recordType,
|
||||||
|
final Persister persister,
|
||||||
|
final Object record,
|
||||||
|
final boolean sync) throws Exception {
|
||||||
|
if (log.isTraceEnabled()) {
|
||||||
|
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
|
||||||
|
}
|
||||||
|
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
|
||||||
|
return localJournal.tryAppendUpdateRecord(id, recordType, persister, record, sync);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecord(final long id,
|
public void appendUpdateRecord(final long id,
|
||||||
final byte journalRecordType,
|
final byte journalRecordType,
|
||||||
|
@ -380,6 +427,20 @@ public class ReplicatedJournal implements Journal {
|
||||||
localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
|
localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(final long id,
|
||||||
|
final byte journalRecordType,
|
||||||
|
final Persister persister,
|
||||||
|
final Object record,
|
||||||
|
final boolean sync,
|
||||||
|
final IOCompletion completionCallback) throws Exception {
|
||||||
|
if (log.isTraceEnabled()) {
|
||||||
|
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
|
||||||
|
}
|
||||||
|
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
|
||||||
|
return localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param txID
|
* @param txID
|
||||||
* @param id
|
* @param id
|
||||||
|
|
|
@ -3093,7 +3093,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
|
public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
|
||||||
final long timeBase,
|
final long timeBase,
|
||||||
final boolean ignoreRedeliveryDelay) throws Exception {
|
final boolean ignoreRedeliveryDelay) throws Exception {
|
||||||
Message message = reference.getMessage();
|
|
||||||
|
|
||||||
if (internalQueue) {
|
if (internalQueue) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -3104,7 +3103,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
|
if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
|
||||||
storageManager.updateDeliveryCount(reference);
|
if (!storageManager.updateDeliveryCount(reference)) {
|
||||||
|
return new Pair<>(false, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
|
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
|
||||||
|
@ -3739,7 +3740,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
// as we can't delete each messaging with sync=true while adding messages transactionally.
|
// as we can't delete each messaging with sync=true while adding messages transactionally.
|
||||||
// There is a startup check to remove non referenced messages case these deletes fail
|
// There is a startup check to remove non referenced messages case these deletes fail
|
||||||
try {
|
try {
|
||||||
storageManager.deleteMessage(message.getMessageID());
|
if (!storageManager.deleteMessage(message.getMessageID())) {
|
||||||
|
ActiveMQServerLogger.LOGGER.errorRemovingMessage(new Exception(), message.getMessageID());
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID());
|
ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID());
|
||||||
}
|
}
|
||||||
|
|
|
@ -354,8 +354,8 @@ public class TransactionImplTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteMessage(long messageID) throws Exception {
|
public boolean deleteMessage(long messageID) throws Exception {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -369,13 +369,13 @@ public class TransactionImplTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateDeliveryCount(MessageReference ref) throws Exception {
|
public boolean updateDeliveryCount(MessageReference ref) throws Exception {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
|
public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,155 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class ForceDeleteQueue extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
ActiveMQServer server;
|
||||||
|
String protocol = "openwire";
|
||||||
|
String uri = "tcp://localhost:61616";
|
||||||
|
|
||||||
|
public ForceDeleteQueue(String protocol) {
|
||||||
|
this.protocol = protocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "protocol={0}")
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
Object[][] params = new Object[][]{{"openwire"}, {"core"}, {"amqp"}};
|
||||||
|
return Arrays.asList(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
if (protocol.equals("openwire")) {
|
||||||
|
uri = "tcp://localhost:61616?jms.prefetchPolicy.all=5000";
|
||||||
|
}
|
||||||
|
|
||||||
|
server = createServer(true, true);
|
||||||
|
server.getAddressSettingsRepository().addMatch("#",
|
||||||
|
new AddressSettings().setMaxDeliveryAttempts(2));
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testForceDelete() throws Exception {
|
||||||
|
SimpleString queueName = SimpleString.toSimpleString("testForceDelete");
|
||||||
|
server.addAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
|
||||||
|
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
|
||||||
|
|
||||||
|
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, uri);
|
||||||
|
Connection conn = factory.createConnection();
|
||||||
|
|
||||||
|
AssertionLoggerHandler.startCapture();
|
||||||
|
try {
|
||||||
|
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue queue = session.createQueue(queueName.toString());
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
TextMessage message = session.createTextMessage("Text " + i);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
|
||||||
|
|
||||||
|
Wait.assertEquals(1000, serverQueue::getMessageCount);
|
||||||
|
|
||||||
|
conn.close();
|
||||||
|
|
||||||
|
conn = factory.createConnection();
|
||||||
|
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
conn.start();
|
||||||
|
|
||||||
|
LinkedListIterator<MessageReference> queueiterator = serverQueue.browserIterator();
|
||||||
|
ArrayList<Long> listQueue = new ArrayList<>(1000);
|
||||||
|
|
||||||
|
while (queueiterator.hasNext()) {
|
||||||
|
MessageReference ref = queueiterator.next();
|
||||||
|
|
||||||
|
listQueue.add(ref.getMessageID());
|
||||||
|
}
|
||||||
|
|
||||||
|
queueiterator.close();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
Wait.assertTrue(() -> serverQueue.getDeliveringCount() > 100);
|
||||||
|
|
||||||
|
for (Long l : listQueue) {
|
||||||
|
// this is forcing an artificial situation where the message was removed during a failure condition
|
||||||
|
server.getStorageManager().deleteMessage(l);
|
||||||
|
}
|
||||||
|
|
||||||
|
server.destroyQueue(queueName, null, false);
|
||||||
|
|
||||||
|
for (RemotingConnection connection : server.getRemotingService().getConnections()) {
|
||||||
|
connection.fail(new ActiveMQException("failure"));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find add info"));
|
||||||
|
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
AssertionLoggerHandler.stopCapture();
|
||||||
|
try {
|
||||||
|
conn.close();
|
||||||
|
} catch (Throwable ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -433,8 +433,8 @@ public class SendAckFailTest extends SpawnedTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteMessage(long messageID) throws Exception {
|
public boolean deleteMessage(long messageID) throws Exception {
|
||||||
manager.deleteMessage(messageID);
|
return manager.deleteMessage(messageID);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -448,13 +448,13 @@ public class SendAckFailTest extends SpawnedTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateDeliveryCount(MessageReference ref) throws Exception {
|
public boolean updateDeliveryCount(MessageReference ref) throws Exception {
|
||||||
manager.updateDeliveryCount(ref);
|
return manager.updateDeliveryCount(ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
|
public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception {
|
||||||
manager.updateScheduledDeliveryTime(ref);
|
return manager.updateScheduledDeliveryTime(ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -95,9 +95,9 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
|
||||||
protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
|
protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
|
||||||
return new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) {
|
return new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) {
|
||||||
@Override
|
@Override
|
||||||
public void deleteMessage(final long messageID) throws Exception {
|
public boolean deleteMessage(final long messageID) throws Exception {
|
||||||
deletedMessage.add(messageID);
|
deletedMessage.add(messageID);
|
||||||
super.deleteMessage(messageID);
|
return super.deleteMessage(messageID);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -649,6 +649,15 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(long id,
|
||||||
|
byte recordType,
|
||||||
|
Persister persister,
|
||||||
|
Object record,
|
||||||
|
boolean sync) throws Exception {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecord(long id,
|
public void appendUpdateRecord(long id,
|
||||||
byte recordType,
|
byte recordType,
|
||||||
|
@ -659,6 +668,16 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(long id,
|
||||||
|
byte recordType,
|
||||||
|
Persister persister,
|
||||||
|
Object record,
|
||||||
|
boolean sync,
|
||||||
|
IOCompletion callback) throws Exception {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendAddRecordTransactional(long txID,
|
public void appendAddRecordTransactional(long txID,
|
||||||
long id,
|
long id,
|
||||||
|
@ -729,6 +748,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecordTransactional(final long txID,
|
public void appendDeleteRecordTransactional(final long txID,
|
||||||
final long id,
|
final long id,
|
||||||
|
@ -775,6 +799,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecord(final long id,
|
public void appendUpdateRecord(final long id,
|
||||||
final byte recordType,
|
final byte recordType,
|
||||||
|
@ -875,6 +904,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
||||||
final IOCompletion completionCallback) throws Exception {
|
final IOCompletion completionCallback) throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendPrepareRecord(final long txID,
|
public void appendPrepareRecord(final long txID,
|
||||||
final EncodingSupport transactionData,
|
final EncodingSupport transactionData,
|
||||||
|
|
|
@ -39,8 +39,9 @@ public class FakeStorageManager extends NullStorageManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteMessage(final long messageID) throws Exception {
|
public boolean deleteMessage(final long messageID) throws Exception {
|
||||||
messageIds.remove(messageID);
|
messageIds.remove(messageID);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -117,6 +117,11 @@ under the License.
|
||||||
<multicast>
|
<multicast>
|
||||||
</multicast>
|
</multicast>
|
||||||
</address>
|
</address>
|
||||||
|
<address name="DLQ">
|
||||||
|
<anycast>
|
||||||
|
<queue name="DLQ"/>
|
||||||
|
</anycast>
|
||||||
|
</address>
|
||||||
<address name="exampleQueue">
|
<address name="exampleQueue">
|
||||||
<anycast>
|
<anycast>
|
||||||
<queue name="exampleQueue"/>
|
<queue name="exampleQueue"/>
|
||||||
|
|
|
@ -119,6 +119,11 @@ under the License.
|
||||||
<multicast>
|
<multicast>
|
||||||
</multicast>
|
</multicast>
|
||||||
</address>
|
</address>
|
||||||
|
<address name="DLQ">
|
||||||
|
<anycast>
|
||||||
|
<queue name="DLQ"/>
|
||||||
|
</anycast>
|
||||||
|
</address>
|
||||||
<address name="exampleQueue">
|
<address name="exampleQueue">
|
||||||
<anycast>
|
<anycast>
|
||||||
<queue name="exampleQueue"/>
|
<queue name="exampleQueue"/>
|
||||||
|
|
|
@ -41,6 +41,10 @@ import org.junit.runners.Parameterized;
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class SoakPagingTest extends SmokeTestBase {
|
public class SoakPagingTest extends SmokeTestBase {
|
||||||
|
|
||||||
|
public static final int LAG_CONSUMER_TIME = 1000;
|
||||||
|
public static final int TIME_RUNNING = 4000;
|
||||||
|
public static final int CLIENT_KILLS = 2;
|
||||||
|
|
||||||
String protocol;
|
String protocol;
|
||||||
String consumerType;
|
String consumerType;
|
||||||
boolean transaction;
|
boolean transaction;
|
||||||
|
@ -86,12 +90,13 @@ public class SoakPagingTest extends SmokeTestBase {
|
||||||
|
|
||||||
private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
|
private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
|
||||||
if (protocol.toUpperCase().equals("OPENWIRE")) {
|
if (protocol.toUpperCase().equals("OPENWIRE")) {
|
||||||
return new org.apache.activemq.ActiveMQConnectionFactory(uri);
|
return new org.apache.activemq.ActiveMQConnectionFactory("failover:(" + uri + ")");
|
||||||
} else if (protocol.toUpperCase().equals("AMQP")) {
|
} else if (protocol.toUpperCase().equals("AMQP")) {
|
||||||
|
|
||||||
if (uri.startsWith("tcp://")) {
|
if (uri.startsWith("tcp://")) {
|
||||||
// replacing tcp:// by amqp://
|
// replacing tcp:// by amqp://
|
||||||
uri = "amqp" + uri.substring(3);
|
uri = "amqp" + uri.substring(3);
|
||||||
|
|
||||||
}
|
}
|
||||||
return new JmsConnectionFactory(uri);
|
return new JmsConnectionFactory(uri);
|
||||||
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
|
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
|
||||||
|
@ -158,25 +163,14 @@ public class SoakPagingTest extends SmokeTestBase {
|
||||||
@Test
|
@Test
|
||||||
public void testPagingReplication() throws Throwable {
|
public void testPagingReplication() throws Throwable {
|
||||||
|
|
||||||
Process queueProcess = null;
|
server1 = startServer(SERVER_NAME_1, 0, 30000);
|
||||||
if (consumerType.equals("queue")) {
|
|
||||||
queueProcess = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "45000", "" + transaction);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < CLIENT_KILLS; i++) {
|
||||||
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "15000", "" + transaction);
|
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "" + TIME_RUNNING, "" + transaction);
|
||||||
|
|
||||||
if (i == 0) {
|
|
||||||
server1 = startServer(SERVER_NAME_1, 0, 30000);
|
|
||||||
}
|
|
||||||
|
|
||||||
int result = process.waitFor();
|
int result = process.waitFor();
|
||||||
Assert.assertTrue(result > 0);
|
Assert.assertTrue(result > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queueProcess != null) {
|
|
||||||
Assert.assertTrue(queueProcess.waitFor() > 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void produce(ConnectionFactory factory) {
|
public void produce(ConnectionFactory factory) {
|
||||||
|
@ -261,7 +255,8 @@ public class SoakPagingTest extends SmokeTestBase {
|
||||||
messageConsumer = session.createConsumer(address);
|
messageConsumer = session.createConsumer(address);
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.sleep(5000);
|
if (LAG_CONSUMER_TIME > 0) Thread.sleep(LAG_CONSUMER_TIME);
|
||||||
|
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
|
|
@ -385,6 +385,20 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
|
||||||
journal.debugWait();
|
journal.debugWait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean tryUpdate(final long argument) throws Exception {
|
||||||
|
byte[] updateRecord = generateRecord(recordLength);
|
||||||
|
|
||||||
|
beforeJournalOperation();
|
||||||
|
|
||||||
|
boolean result = journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, sync);
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, (short) 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
protected void update(final long... arguments) throws Exception {
|
protected void update(final long... arguments) throws Exception {
|
||||||
for (long element : arguments) {
|
for (long element : arguments) {
|
||||||
byte[] updateRecord = generateRecord(recordLength);
|
byte[] updateRecord = generateRecord(recordLength);
|
||||||
|
@ -411,6 +425,20 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
|
||||||
journal.debugWait();
|
journal.debugWait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean tryDelete(final long argument) throws Exception {
|
||||||
|
beforeJournalOperation();
|
||||||
|
|
||||||
|
boolean result = journal.tryAppendDeleteRecord(argument, sync);
|
||||||
|
|
||||||
|
if (result) {
|
||||||
|
removeRecordsForID(argument);
|
||||||
|
}
|
||||||
|
|
||||||
|
journal.debugWait();
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
protected void addTx(final long txID, final long... arguments) throws Exception {
|
protected void addTx(final long txID, final long... arguments) throws Exception {
|
||||||
TransactionHolder tx = getTransaction(txID);
|
TransactionHolder tx = getTransaction(txID);
|
||||||
|
|
||||||
|
|
|
@ -2688,6 +2688,22 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
|
||||||
loadAndCheck();
|
loadAndCheck();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTryIsolation2() throws Exception {
|
||||||
|
setup(10, 10 * 1024, true);
|
||||||
|
createJournal();
|
||||||
|
startJournal();
|
||||||
|
load();
|
||||||
|
addTx(1, 1, 2, 3);
|
||||||
|
|
||||||
|
Assert.assertFalse(tryUpdate(1));
|
||||||
|
|
||||||
|
stopJournal();
|
||||||
|
createJournal();
|
||||||
|
startJournal();
|
||||||
|
loadAndCheck();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIsolation3() throws Exception {
|
public void testIsolation3() throws Exception {
|
||||||
setup(10, 10 * 1024, true);
|
setup(10, 10 * 1024, true);
|
||||||
|
@ -2708,6 +2724,22 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
|
||||||
loadAndCheck();
|
loadAndCheck();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTryDelete() throws Exception {
|
||||||
|
setup(10, 10 * 1024, true);
|
||||||
|
createJournal();
|
||||||
|
startJournal();
|
||||||
|
load();
|
||||||
|
addTx(1, 1, 2, 3);
|
||||||
|
|
||||||
|
Assert.assertFalse(tryDelete(1));
|
||||||
|
|
||||||
|
stopJournal();
|
||||||
|
createJournal();
|
||||||
|
startJournal();
|
||||||
|
loadAndCheck();
|
||||||
|
}
|
||||||
|
|
||||||
// XA tests
|
// XA tests
|
||||||
// ========
|
// ========
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue