ARTEMIS-4925 Fixing issue with mirroring and replication

the page writes on replica should include the address where they belong to avoid leaks between the mirrorSNF and the actual addresses.
This commit is contained in:
Clebert Suconic 2024-07-16 10:58:31 -04:00 committed by clebertsuconic
parent 5af90578cb
commit 6c1134388b
12 changed files with 54 additions and 30 deletions

View File

@ -180,7 +180,7 @@ public final class Page {
public synchronized void write(final PagedMessage message) throws Exception {
writeDirect(message);
storageManager.pageWrite(message, pageId);
storageManager.pageWrite(storeName, message, pageId);
}
/** This write will not interact back with the storage manager.

View File

@ -1535,7 +1535,7 @@ public class PagingStoreImpl implements PagingStore {
final long newPageId = currentPageId + 1;
if (logger.isTraceEnabled()) {
logger.trace("new pageNr={}", newPageId);
logger.trace("destination {} new pageNr={}", storeName, newPageId);
}
final Page oldPage = currentPage;

View File

@ -126,11 +126,11 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
// Message related operations
void pageClosed(SimpleString storeName, long pageNumber);
void pageClosed(SimpleString address, long pageNumber);
void pageDeleted(SimpleString storeName, long pageNumber);
void pageDeleted(SimpleString address, long pageNumber);
void pageWrite(PagedMessage message, long pageNumber);
void pageWrite(SimpleString address, PagedMessage message, long pageNumber);
void afterCompleteOperations(IOCallback run);

View File

@ -394,7 +394,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
@Override
public void pageWrite(final PagedMessage message, final long pageNumber) {
public void pageWrite(final SimpleString address, final PagedMessage message, final long pageNumber) {
if (messageJournal.isHistory()) {
try (ArtemisCloseable lock = closeableReadLock()) {
@ -421,7 +421,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
try (ArtemisCloseable lock = closeableReadLock()) {
if (isReplicated())
replicator.pageWrite(message, pageNumber);
replicator.pageWrite(address, message, pageNumber);
}
}
}
@ -703,8 +703,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
Map<SimpleString, Collection<Integer>> info = new HashMap<>();
for (SimpleString storeName : pagingManager.getStoreNames()) {
PagingStore store = pagingManager.getPageStore(storeName);
info.put(storeName, store.getCurrentIds());
store.forceAnotherPage();
Collection<Integer> ids = store.getCurrentIds();
info.put(storeName, ids);
if (!ids.isEmpty()) {
store.forceAnotherPage();
}
}
return info;
}

View File

@ -402,7 +402,7 @@ public class NullStorageManager implements StorageManager {
}
@Override
public void pageWrite(final PagedMessage message, final long pageNumber) {
public void pageWrite(final SimpleString address, final PagedMessage message, final long pageNumber) {
}
@Override

View File

@ -201,7 +201,7 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
break;
}
case REPLICATION_PAGE_WRITE: {
packet = new ReplicationPageWriteMessage(connection.isVersionUsingLongOnPageReplication());
packet = new ReplicationPageWriteMessage(connection.isVersionUsingLongOnPageReplication(), coreMessageObjectPools);
break;
}
case REPLICATION_PAGE_EVENT: {

View File

@ -18,8 +18,10 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
@ -29,28 +31,35 @@ public class ReplicationPageWriteMessage extends PacketImpl implements MessagePa
protected PagedMessage pagedMessage;
protected SimpleString address;
final boolean useLong;
public ReplicationPageWriteMessage(final boolean useLong) {
final CoreMessageObjectPools coreMessageObjectPools;
public ReplicationPageWriteMessage(final boolean useLong, CoreMessageObjectPools coreMessageObjectPools) {
super(PacketImpl.REPLICATION_PAGE_WRITE);
this.useLong = useLong;
this.coreMessageObjectPools = coreMessageObjectPools;
}
public ReplicationPageWriteMessage(final PagedMessage pagedMessage, final long pageNumber, final boolean useLong) {
this(useLong);
public ReplicationPageWriteMessage(final PagedMessage pagedMessage, final long pageNumber, final boolean useLong, final SimpleString address) {
this(useLong, null);
this.pageNumber = pageNumber;
this.pagedMessage = pagedMessage;
this.address = address;
}
@Override
public int expectedEncodeSize() {
if (useLong) {
return PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + // buffer.writeLong(pageNumber);
pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer);
return PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + // buffer.writeLong(pageNumber)
pagedMessage.getEncodeSize() + // pagedMessage.encode(buffer)
SimpleString.sizeofString(address); // SizeUtil.writeNullableString(address)
} else {
return PACKET_HEADERS_SIZE + DataConstants.SIZE_INT + // buffer.writeInt(pageNumber);
pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer);
return PACKET_HEADERS_SIZE + DataConstants.SIZE_INT + // buffer.writeInt(pageNumber)
pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer)
}
}
@ -65,6 +74,9 @@ public class ReplicationPageWriteMessage extends PacketImpl implements MessagePa
buffer.writeInt((int) pageNumber);
}
pagedMessage.encode(buffer);
if (useLong) {
buffer.writeNullableSimpleString(address);
}
}
@Override
@ -76,6 +88,17 @@ public class ReplicationPageWriteMessage extends PacketImpl implements MessagePa
}
pagedMessage = new PagedMessageImpl(0, null);
pagedMessage.decode(buffer);
if (buffer.readableBytes() > 0) {
address = SimpleString.readNullableSimpleString(buffer.byteBuf(), coreMessageObjectPools.getAddressDecoderPool());
}
}
public SimpleString getAddress() {
if (address != null) {
return address;
} else {
return pagedMessage.getMessage().getAddressSimpleString();
}
}
/**

View File

@ -35,7 +35,6 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
@ -831,8 +830,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private void handlePageWrite(final ReplicationPageWriteMessage packet) throws Exception {
PagedMessage pgdMessage = packet.getPagedMessage();
pgdMessage.initMessage(storageManager);
Message msg = pgdMessage.getMessage();
Page page = getPage(msg.getAddressSimpleString(), packet.getPageNumber());
Page page = getPage(packet.getAddress(), packet.getPageNumber());
page.writeDirect(pgdMessage);
}

View File

@ -299,9 +299,9 @@ public final class ReplicationManager implements ActiveMQComponent {
}
}
public void pageWrite(final PagedMessage message, final long pageNumber) {
public void pageWrite(final SimpleString address, final PagedMessage message, final long pageNumber) {
if (started) {
sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber, remotingConnection.isVersionUsingLongOnPageReplication()));
sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber, remotingConnection.isVersionUsingLongOnPageReplication(), address));
}
}

View File

@ -420,7 +420,7 @@ public class TransactionImplTest extends ServerTestBase {
}
@Override
public void pageWrite(PagedMessage message, long pageNumber) {
public void pageWrite(SimpleString address, PagedMessage message, long pageNumber) {
}

View File

@ -378,8 +378,8 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
public void pageWrite(PagedMessage message, long pageNumber) {
manager.pageWrite(message, pageNumber);
public void pageWrite(SimpleString address, PagedMessage message, long pageNumber) {
manager.pageWrite(address, message, pageNumber);
}
@Override

View File

@ -275,10 +275,10 @@ public final class ReplicationTest extends ActiveMQTestBase {
replicatedJournal.appendAddRecordTransactional(23, 24, (byte) 1, new FakeData());
PagedMessage pgmsg = new PagedMessageImpl(msg, new long[0]);
manager.pageWrite(pgmsg, 1);
manager.pageWrite(pgmsg, 2);
manager.pageWrite(pgmsg, 3);
manager.pageWrite(pgmsg, 4);
manager.pageWrite(dummy, pgmsg, 1);
manager.pageWrite(dummy, pgmsg, 2);
manager.pageWrite(dummy, pgmsg, 3);
manager.pageWrite(dummy, pgmsg, 4);
blockOnReplication(storage, manager);