ARTEMIS-27 / ARTEMIS-338 Refactor Journal Encodings into new package

This commit is contained in:
Martyn Taylor 2016-01-06 13:50:35 +00:00 committed by Clebert Suconic
parent aab09a77d3
commit 9b351d8236
33 changed files with 1521 additions and 993 deletions

View File

@ -47,7 +47,8 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@ -235,7 +236,7 @@ public class PrintData extends LockAbstract {
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
if (record.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR) {
JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
Set<PagePosition> set = cursorInfo.getCursorRecords().get(encoding.queueID);
@ -248,7 +249,7 @@ public class PrintData extends LockAbstract {
set.add(encoding.position);
}
else if (record.userRecordType == JournalRecordIds.PAGE_CURSOR_COMPLETE) {
JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
Long queueID = Long.valueOf(encoding.queueID);
@ -260,7 +261,7 @@ public class PrintData extends LockAbstract {
}
else if (record.userRecordType == JournalRecordIds.PAGE_TRANSACTION) {
if (record.isUpdate) {
JournalStorageManager.PageUpdateTXEncoding pageUpdate = new JournalStorageManager.PageUpdateTXEncoding();
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
pageUpdate.decode(buff);
cursorInfo.getPgTXs().add(pageUpdate.pageTX);

View File

@ -71,10 +71,10 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;

View File

@ -21,17 +21,33 @@ import java.util.List;
public class PreparedTransactionInfo {
public final long id;
private final long id;
public final byte[] extraData;
private final byte[] extraData;
public final List<RecordInfo> records = new ArrayList<>();
private final List<RecordInfo> records = new ArrayList<RecordInfo>();
public final List<RecordInfo> recordsToDelete = new ArrayList<>();
private final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
public PreparedTransactionInfo(final long id, final byte[] extraData) {
this.id = id;
this.extraData = extraData;
}
public long getId() {
return id;
}
public byte[] getExtraData() {
return extraData;
}
public List<RecordInfo> getRecords() {
return records;
}
public List<RecordInfo> getRecordsToDelete() {
return recordsToDelete;
}
}

View File

@ -1786,9 +1786,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
info.records.addAll(transaction.recordInfos);
info.getRecords().addAll(transaction.recordInfos);
info.recordsToDelete.addAll(transaction.recordsToDelete);
info.getRecordsToDelete().addAll(transaction.recordsToDelete);
loadManager.addPreparedTransaction(info);
}

View File

@ -22,13 +22,13 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
public class JournalAddRecord extends JournalInternalRecord {
private final long id;
protected final long id;
private final EncodingSupport record;
protected final EncodingSupport record;
private final byte recordType;
protected final byte recordType;
private final boolean add;
protected final boolean add;
/**
* @param id

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
public final class AckDescribe {
public RefEncoding refEncoding;
public AckDescribe(RefEncoding refEncoding) {
this.refEncoding = refEncoding;
}
@Override
public String toString() {
return "ACK;" + refEncoding;
}
}

View File

@ -26,9 +26,11 @@ public final class AddMessageRecord {
final ServerMessage message;
long scheduledDeliveryTime;
// mtaylor (Added to compile)
public long scheduledDeliveryTime;
int deliveryCount;
// mtaylor (Added to compile)
public int deliveryCount;
public ServerMessage getMessage() {
return message;
@ -41,4 +43,5 @@ public final class AddMessageRecord {
public int getDeliveryCount() {
return deliveryCount;
}
}

View File

@ -28,31 +28,30 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator.IDCounterEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.DeliveryCountUpdateEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.DuplicateIDEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.HeuristicCompletionEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.LargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountPendingImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountRecord;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountRecordInc;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PendingLargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.RefEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.ScheduledDeliveryEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
@ -200,15 +199,15 @@ public final class DescribeJournal {
public void checkRecordCounter(RecordInfo info) {
if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
PageCountRecord encoding = (PageCountRecord) newObjectEncoding(info);
long queueIDForCounter = encoding.queueID;
long queueIDForCounter = encoding.getQueueID();
PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
if (subsCounter.getValue() != 0 && subsCounter.getValue() != encoding.value) {
out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.value);
if (subsCounter.getValue() != 0 && subsCounter.getValue() != encoding.getValue()) {
out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.getValue());
}
subsCounter.loadValue(info.id, encoding.value);
subsCounter.loadValue(info.id, encoding.getValue());
subsCounter.processReload();
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + ", result=" + subsCounter.getValue());
if (subsCounter.getValue() < 0) {
@ -221,13 +220,13 @@ public final class DescribeJournal {
}
else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) newObjectEncoding(info);
long queueIDForCounter = encoding.queueID;
long queueIDForCounter = encoding.getQueueID();
PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadInc(info.id, encoding.value);
subsCounter.loadInc(info.id, encoding.getValue());
subsCounter.processReload();
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.value);
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.getValue());
if (subsCounter.getValue() < 0) {
out.println(" #NegativeCounter!!!!");
}
@ -311,20 +310,20 @@ public final class DescribeJournal {
}
else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
PageCountRecord encoding = (PageCountRecord) o;
queueIDForCounter = encoding.queueID;
queueIDForCounter = encoding.getQueueID();
subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadValue(info.id, encoding.value);
subsCounter.loadValue(info.id, encoding.getValue());
subsCounter.processReload();
}
else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) o;
queueIDForCounter = encoding.queueID;
queueIDForCounter = encoding.getQueueID();
subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadInc(info.id, encoding.value);
subsCounter.loadInc(info.id, encoding.getValue());
subsCounter.processReload();
}
@ -345,8 +344,8 @@ public final class DescribeJournal {
out.println("### Prepared TX ###");
for (PreparedTransactionInfo tx : preparedTransactions) {
out.println(tx.id);
for (RecordInfo info : tx.records) {
out.println(tx.getId());
for (RecordInfo info : tx.getRecords()) {
Object o = newObjectEncoding(info);
out.println("- " + describeRecord(info, o));
if (info.getUserRecordType() == 31) {
@ -365,7 +364,7 @@ public final class DescribeJournal {
}
}
for (RecordInfo info : tx.recordsToDelete) {
for (RecordInfo info : tx.getRecordsToDelete()) {
out.println("- " + describeRecord(info) + " <marked to delete>");
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
final class DummyOperationContext implements OperationContext {
private static DummyOperationContext instance = new DummyOperationContext();
public static OperationContext getInstance() {
return DummyOperationContext.instance;
}
public void executeOnCompletion(final IOCallback runnable) {
// There are no executeOnCompletion calls while using the DummyOperationContext
// However we keep the code here for correctness
runnable.done();
}
public void replicationDone() {
}
public void replicationLineUp() {
}
public void storeLineUp() {
}
public void done() {
}
public void onError(final int errorCode, final String errorMessage) {
}
public void waitCompletion() {
}
public boolean waitCompletion(final long timeout) {
return true;
}
public void pageSyncLineUp() {
}
public void pageSyncDone() {
}
}

View File

@ -26,7 +26,9 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
public final class JournalRecordIds {
// grouping journal record type
static final byte GROUP_RECORD = 20;
// mtaylor Added to compile
public static final byte GROUP_RECORD = 20;
// BindingsImpl journal record type

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.ServerMessage;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
public class LargeMessageTXFailureCallback implements TransactionFailureCallback {
private AbstractJournalStorageManager journalStorageManager;
private final Map<Long, ServerMessage> messages;
public LargeMessageTXFailureCallback(AbstractJournalStorageManager journalStorageManager,
final Map<Long, ServerMessage> messages) {
super();
this.journalStorageManager = journalStorageManager;
this.messages = messages;
}
public void failedTransaction(final long transactionID,
final List<RecordInfo> records,
final List<RecordInfo> recordsToDelete) {
for (RecordInfo record : records) {
if (record.userRecordType == ADD_LARGE_MESSAGE) {
byte[] data = record.data;
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
try {
LargeServerMessage serverMessage = journalStorageManager.parseLargeMessage(messages, buff);
serverMessage.decrementDelayDeletionCount();
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.journalError(e);
}
}
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
public final class TXLargeMessageConfirmationOperation extends TransactionOperationAbstract {
private AbstractJournalStorageManager journalStorageManager;
public List<Long> confirmedMessages = new LinkedList<Long>();
public TXLargeMessageConfirmationOperation(AbstractJournalStorageManager journalStorageManager) {
this.journalStorageManager = journalStorageManager;
}
@Override
public void afterRollback(Transaction tx) {
for (Long msg : confirmedMessages) {
try {
journalStorageManager.confirmPendingLargeMessage(msg);
}
catch (Throwable e) {
ActiveMQServerLogger.LOGGER.journalErrorConfirmingLargeMessage(e, msg);
}
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public class CursorAckRecordEncoding implements EncodingSupport {
public CursorAckRecordEncoding(final long queueID, final PagePosition position) {
this.queueID = queueID;
this.position = position;
}
public CursorAckRecordEncoding() {
this.position = new PagePositionImpl();
}
@Override
public String toString() {
return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
}
public long queueID;
public PagePosition position;
public int getEncodeSize() {
return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeLong(position.getPageNr());
buffer.writeInt(position.getMessageNr());
}
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
long pageNR = buffer.readLong();
int messageNR = buffer.readInt();
this.position = new PagePositionImpl(pageNR, messageNR);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
public class DeleteEncoding implements EncodingSupport {
public byte recordType;
public long id;
public DeleteEncoding(final byte recordType, final long id) {
this.recordType = recordType;
this.id = id;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
*/
@Override
public int getEncodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
*/
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeByte(recordType);
buffer.writeLong(id);
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
*/
@Override
public void decode(ActiveMQBuffer buffer) {
recordType = buffer.readByte();
id = buffer.readLong();
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
public class DeliveryCountUpdateEncoding implements EncodingSupport {
public long queueID;
public int count;
public DeliveryCountUpdateEncoding() {
super();
}
public DeliveryCountUpdateEncoding(final long queueID, final int count) {
super();
this.queueID = queueID;
this.count = count;
}
public void decode(final ActiveMQBuffer buffer) {
queueID = buffer.readLong();
count = buffer.readInt();
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeInt(count);
}
public int getEncodeSize() {
return 8 + 4;
}
@Override
public String toString() {
return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.UUID;
public class DuplicateIDEncoding implements EncodingSupport {
public SimpleString address;
public byte[] duplID;
public DuplicateIDEncoding(final SimpleString address, final byte[] duplID) {
this.address = address;
this.duplID = duplID;
}
public DuplicateIDEncoding() {
}
public void decode(final ActiveMQBuffer buffer) {
address = buffer.readSimpleString();
int size = buffer.readInt();
duplID = new byte[size];
buffer.readBytes(duplID);
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(address);
buffer.writeInt(duplID.length);
buffer.writeBytes(duplID);
}
public int getEncodeSize() {
return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
}
@Override
public String toString() {
// this would be useful when testing. Most tests on the testsuite will use a SimpleString on the duplicate ID
// and this may be useful to validate the journal on those tests
// You may uncomment these two lines on that case and replcate the toString for the PrintData
// SimpleString simpleStr = new SimpleString(duplID);
// return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
String bridgeRepresentation = null;
// The bridge will generate IDs on these terms:
// This will make them easier to read
if (address.toString().startsWith("BRIDGE") && duplID.length == 24) {
try {
ByteBuffer buff = ByteBuffer.wrap(duplID);
// 16 for UUID
byte[] bytesUUID = new byte[16];
buff.get(bytesUUID);
UUID uuid = new UUID(UUID.TYPE_TIME_BASED, bytesUUID);
long id = buff.getLong();
bridgeRepresentation = "nodeUUID=" + uuid.toString() + " messageID=" + id;
}
catch (Throwable ignored) {
bridgeRepresentation = null;
}
}
if (bridgeRepresentation != null) {
return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + " / " +
bridgeRepresentation + "]";
}
else {
return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
/**
* This is only used when loading a transaction.
* <p>
* it might be possible to merge the functionality of this class with
* {@link FinishPageMessageOperation}
*/
// TODO: merge this class with the one on the PagingStoreImpl
public class FinishPageMessageOperation extends TransactionOperationAbstract implements TransactionOperation {
@Override
public void afterCommit(final Transaction tx) {
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
// transaction until all the messages were added to the queue
// or else we could deliver the messages out of order
PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (pageTransaction != null) {
pageTransaction.commit();
}
}
@Override
public void afterRollback(final Transaction tx) {
PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (tx.getState() == Transaction.State.PREPARED && pageTransaction != null) {
pageTransaction.rollback();
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
public class GroupingEncoding implements EncodingSupport, GroupingInfo {
public long id;
public SimpleString groupId;
public SimpleString clusterName;
public GroupingEncoding(final long id, final SimpleString groupId, final SimpleString clusterName) {
this.id = id;
this.groupId = groupId;
this.clusterName = clusterName;
}
public GroupingEncoding() {
}
public int getEncodeSize() {
return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName);
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(groupId);
buffer.writeSimpleString(clusterName);
}
public void decode(final ActiveMQBuffer buffer) {
groupId = buffer.readSimpleString();
clusterName = buffer.readSimpleString();
}
public long getId() {
return id;
}
public void setId(final long id) {
this.id = id;
}
public SimpleString getGroupId() {
return groupId;
}
public SimpleString getClusterName() {
return clusterName;
}
@Override
public String toString() {
return "GroupingEncoding [id=" + id + ", groupId=" + groupId + ", clusterName=" + clusterName + "]";
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.XidCodecSupport;
public class HeuristicCompletionEncoding implements EncodingSupport {
public Xid xid;
public boolean isCommit;
@Override
public String toString() {
return "HeuristicCompletionEncoding [xid=" + xid + ", isCommit=" + isCommit + "]";
}
public HeuristicCompletionEncoding(final Xid xid, final boolean isCommit) {
this.xid = xid;
this.isCommit = isCommit;
}
public HeuristicCompletionEncoding() {
}
public void decode(final ActiveMQBuffer buffer) {
xid = XidCodecSupport.decodeXid(buffer);
isCommit = buffer.readBoolean();
}
public void encode(final ActiveMQBuffer buffer) {
XidCodecSupport.encodeXid(xid, buffer);
buffer.writeBoolean(isCommit);
}
public int getEncodeSize() {
return XidCodecSupport.getXidEncodeLength(xid) + DataConstants.SIZE_BOOLEAN;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
public class LargeMessageEncoding implements EncodingSupport {
public final LargeServerMessage message;
public LargeMessageEncoding(final LargeServerMessage message) {
this.message = message;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
*/
public void decode(final ActiveMQBuffer buffer) {
message.decodeHeadersAndProperties(buffer);
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
*/
public void encode(final ActiveMQBuffer buffer) {
message.encode(buffer);
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
*/
public int getEncodeSize() {
return message.getEncodeSize();
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.utils.DataConstants;
public class PageCountPendingImpl implements EncodingSupport, PageCountPending {
@Override
public String toString() {
return "PageCountPending [queueID=" + queueID + ", pageID=" + pageID + "]";
}
public PageCountPendingImpl() {
}
public PageCountPendingImpl(long queueID, long pageID, int inc) {
this.queueID = queueID;
this.pageID = pageID;
}
long id;
long queueID;
long pageID;
public void setID(long id) {
this.id = id;
}
public long getID() {
return id;
}
public long getQueueID() {
return queueID;
}
public long getPageID() {
return pageID;
}
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG * 2;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeLong(pageID);
}
@Override
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
pageID = buffer.readLong();
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
public class PageCountRecord implements EncodingSupport {
private long queueID;
private long value;
@Override
public String toString() {
return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
}
public PageCountRecord() {
}
public PageCountRecord(long queueID, long value) {
this.queueID = queueID;
this.value = value;
}
public long getQueueID() {
return queueID;
}
public long getValue() {
return value;
}
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG * 2;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeLong(value);
}
@Override
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
value = buffer.readLong();
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
public class PageCountRecordInc implements EncodingSupport {
private long queueID;
private int value;
@Override
public String toString() {
return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
}
public PageCountRecordInc() {
}
public PageCountRecordInc(long queueID, int value) {
this.queueID = queueID;
this.value = value;
}
public long getQueueID() {
return queueID;
}
public int getValue() {
return value;
}
public int getEncodeSize() {
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeInt(value);
}
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
value = buffer.readInt();
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.utils.DataConstants;
public class PageUpdateTXEncoding implements EncodingSupport {
public long pageTX;
public int recods;
@Override
public String toString() {
return "PageUpdateTXEncoding [pageTX=" + pageTX + ", recods=" + recods + "]";
}
public PageUpdateTXEncoding() {
}
public PageUpdateTXEncoding(final long pageTX, final int records) {
this.pageTX = pageTX;
this.recods = records;
}
public void decode(ActiveMQBuffer buffer) {
this.pageTX = buffer.readLong();
this.recods = buffer.readInt();
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(pageTX);
buffer.writeInt(recods);
}
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
public class PendingLargeMessageEncoding implements EncodingSupport {
public long largeMessageID;
public PendingLargeMessageEncoding(final long pendingLargeMessageID) {
this.largeMessageID = pendingLargeMessageID;
}
public PendingLargeMessageEncoding() {
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
*/
public void decode(final ActiveMQBuffer buffer) {
largeMessageID = buffer.readLong();
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
*/
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(largeMessageID);
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
*/
public int getEncodeSize() {
return DataConstants.SIZE_LONG;
}
@Override
public String toString() {
return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.utils.DataConstants;
public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo {
public long id;
public SimpleString name;
public SimpleString address;
public SimpleString filterString;
public boolean autoCreated;
public SimpleString user;
public PersistentQueueBindingEncoding() {
}
@Override
public String toString() {
return "PersistentQueueBindingEncoding [id=" + id +
", name=" +
name +
", address=" +
address +
", filterString=" +
filterString +
", user=" +
user +
", autoCreated=" +
autoCreated +
"]";
}
public PersistentQueueBindingEncoding(final SimpleString name,
final SimpleString address,
final SimpleString filterString,
final SimpleString user,
final boolean autoCreated) {
this.name = name;
this.address = address;
this.filterString = filterString;
this.user = user;
this.autoCreated = autoCreated;
}
public long getId() {
return id;
}
public void setId(final long id) {
this.id = id;
}
public SimpleString getAddress() {
return address;
}
public void replaceQueueName(SimpleString newName) {
this.name = newName;
}
public SimpleString getFilterString() {
return filterString;
}
public SimpleString getQueueName() {
return name;
}
public SimpleString getUser() {
return user;
}
public boolean isAutoCreated() {
return autoCreated;
}
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
address = buffer.readSimpleString();
filterString = buffer.readNullableSimpleString();
String metadata = buffer.readNullableSimpleString().toString();
if (metadata != null) {
String[] elements = metadata.split(";");
for (String element : elements) {
String[] keyValuePair = element.split("=");
if (keyValuePair.length == 2) {
if (keyValuePair[0].equals("user")) {
user = SimpleString.toSimpleString(keyValuePair[1]);
}
}
}
}
autoCreated = buffer.readBoolean();
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(name);
buffer.writeSimpleString(address);
buffer.writeNullableSimpleString(filterString);
buffer.writeNullableSimpleString(createMetadata());
buffer.writeBoolean(autoCreated);
}
public int getEncodeSize() {
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
SimpleString.sizeofNullableString(createMetadata());
}
private SimpleString createMetadata() {
StringBuilder metadata = new StringBuilder();
metadata.append("user=").append(user).append(";");
return SimpleString.toSimpleString(metadata.toString());
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
public class QueueEncoding implements EncodingSupport {
public long queueID;
public QueueEncoding(final long queueID) {
super();
this.queueID = queueID;
}
public QueueEncoding() {
super();
}
public void decode(final ActiveMQBuffer buffer) {
queueID = buffer.readLong();
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
}
public int getEncodeSize() {
return 8;
}
@Override
public String toString() {
return "QueueEncoding [queueID=" + queueID + "]";
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
public class RefEncoding extends QueueEncoding {
public RefEncoding() {
super();
}
public RefEncoding(final long queueID) {
super(queueID);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
public class ScheduledDeliveryEncoding extends QueueEncoding {
public long scheduledDeliveryTime;
@Override
public String toString() {
return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]";
}
public ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) {
super(queueID);
this.scheduledDeliveryTime = scheduledDeliveryTime;
}
public ScheduledDeliveryEncoding() {
}
@Override
public int getEncodeSize() {
return super.getEncodeSize() + 8;
}
@Override
public void encode(final ActiveMQBuffer buffer) {
super.encode(buffer);
buffer.writeLong(scheduledDeliveryTime);
}
@Override
public void decode(final ActiveMQBuffer buffer) {
super.decode(buffer);
scheduledDeliveryTime = buffer.readLong();
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.XidCodecSupport;
/**
* It's public as other classes may want to unparse data on tools
*/
public class XidEncoding implements EncodingSupport {
public final Xid xid;
public XidEncoding(final Xid xid) {
this.xid = xid;
}
public XidEncoding(final byte[] data) {
xid = XidCodecSupport.decodeXid(ActiveMQBuffers.wrappedBuffer(data));
}
public void decode(final ActiveMQBuffer buffer) {
throw new IllegalStateException("Non Supported Operation");
}
public void encode(final ActiveMQBuffer buffer) {
XidCodecSupport.encodeXid(xid, buffer);
}
public int getEncodeSize() {
return XidCodecSupport.getXidEncodeLength(xid);
}
}

View File

@ -839,10 +839,10 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
setupAndLoadJournal(JOURNAL_SIZE, 1);
Assert.assertEquals(1, transactions.size());
Assert.assertEquals(1, transactions.get(0).recordsToDelete.size());
Assert.assertEquals(1, transactions.get(0).getRecordsToDelete().size());
Assert.assertEquals(1, records.size());
for (RecordInfo record : transactions.get(0).recordsToDelete) {
for (RecordInfo record : transactions.get(0).getRecordsToDelete()) {
byte[] data = record.data;
Assert.assertEquals(100, data.length);
for (byte element : data) {
@ -850,10 +850,10 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
}
}
Assert.assertEquals(10, transactions.get(0).extraData.length);
Assert.assertEquals(10, transactions.get(0).getExtraData().length);
for (int i = 0; i < 10; i++) {
Assert.assertEquals((byte) 1, transactions.get(0).extraData[i]);
Assert.assertEquals((byte) 1, transactions.get(0).getExtraData()[i]);
}
journalImpl.appendCommitRecord(1L, false);
@ -894,9 +894,9 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals(0, records.size());
Assert.assertEquals(1, transactions.size());
Assert.assertEquals(10, transactions.get(0).extraData.length);
Assert.assertEquals(10, transactions.get(0).getExtraData().length);
for (int i = 0; i < 10; i++) {
Assert.assertEquals((byte) 1, transactions.get(0).extraData[i]);
Assert.assertEquals((byte) 1, transactions.get(0).getExtraData()[i]);
}
journalImpl.checkReclaimStatus();
@ -925,9 +925,9 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals(1, transactions.size());
Assert.assertEquals(15, transactions.get(0).extraData.length);
Assert.assertEquals(15, transactions.get(0).getExtraData().length);
for (byte element : transactions.get(0).extraData) {
for (byte element : transactions.get(0).getExtraData()) {
Assert.assertEquals(2, element);
}

View File

@ -265,9 +265,9 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
if (entry.getValue().prepared) {
PreparedTransactionInfo info = new PreparedTransactionInfo(entry.getKey(), null);
info.records.addAll(entry.getValue().records);
info.getRecords().addAll(entry.getValue().records);
info.recordsToDelete.addAll(entry.getValue().deletes);
info.getRecordsToDelete().addAll(entry.getValue().deletes);
prepared.add(info);
}
@ -465,15 +465,15 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
PreparedTransactionInfo ractual = iterActual.next();
Assert.assertEquals("ids not same", rexpected.id, ractual.id);
Assert.assertEquals("ids not same", rexpected.getId(), ractual.getId());
checkRecordsEquivalent(rexpected.records, ractual.records);
checkRecordsEquivalent(rexpected.getRecords(), ractual.getRecords());
Assert.assertEquals("deletes size not same", rexpected.recordsToDelete.size(), ractual.recordsToDelete.size());
Assert.assertEquals("deletes size not same", rexpected.getRecordsToDelete().size(), ractual.getRecordsToDelete().size());
Iterator<RecordInfo> iterDeletesExpected = rexpected.recordsToDelete.iterator();
Iterator<RecordInfo> iterDeletesExpected = rexpected.getRecordsToDelete().iterator();
Iterator<RecordInfo> iterDeletesActual = ractual.recordsToDelete.iterator();
Iterator<RecordInfo> iterDeletesActual = ractual.getRecordsToDelete().iterator();
while (iterDeletesExpected.hasNext()) {
long lexpected = iterDeletesExpected.next().id;