ARTEMIS-1221 Duplicated ID causes LargeMessage lost at backup

When a large message is replicated to backup, a pendingID is generated
when the large message is finished. This pendingID is generated by a
BatchingIDGenerator at backup.

It is possible that a pendingID generated at backup may be a duplicate
to an ID generated at live server.

This can cause a problem when a large message with a messageID that is
the same as another largemessage's pendingID is replicated and stored
in the backup's journal, and then a deleteRecord for the pendingID
is appended. If backup becomes live and loads the journal, it will
drop the large message add record because there is a deleteRecord of
the same ID (even though it is a pendingID of another message).
As a result the expecting client will never get this large message.

So in summary, the root cause is that the pendingIDs for large
messages are generated at backup while backup is not alive.

The solution to this is that instead of the backup generating
the pendingID, we make them all be generated in advance
at live server and let them replicated to backup whereever needed.
The ID generater at backup only works when backup becomes live
(when it is properly initialized from journal).
This commit is contained in:
Howard Gao 2017-06-29 00:03:47 +08:00
parent 70f6a29569
commit d50f577cd5
11 changed files with 203 additions and 19 deletions

View File

@ -1565,7 +1565,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
if (largeServerMessage.getPendingRecordID() >= 0) {
try {
confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
largeServerMessage.setPendingRecordID(-1);
largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}

View File

@ -274,7 +274,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId);
}
if (replicator != null) {
replicator.largeMessageDelete(largeMsgId);
replicator.largeMessageDelete(largeMsgId, JournalStorageManager.this);
}
}
largeMessagesToDelete.clear();
@ -375,10 +375,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
journalFF.releaseBuffer(buffer);
}
public long storePendingLargeMessage(final long messageID) throws Exception {
public long storePendingLargeMessage(final long messageID, long recordID) throws Exception {
readLock();
try {
long recordID = generateID();
if (recordID == LargeServerMessage.NO_PENDING_ID) {
recordID = generateID();
} else {
//this means the large message doesn't
//have a pendingRecordID, but one has been
//generated (coming from live server) for use.
recordID = -recordID;
}
messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true));
@ -396,7 +403,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// And the client won't be waiting for the actual file to be deleted.
// We set a temporary record (short lived) on the journal
// to avoid a situation where the server is restarted and pending large message stays on forever
largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID()));
largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID()));
} catch (Exception e) {
throw new ActiveMQInternalErrorException(e.getMessage(), e);
}
@ -427,7 +434,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
readLock();
try {
if (replicator != null) {
replicator.largeMessageDelete(largeServerMessage.getMessageID());
replicator.largeMessageDelete(largeServerMessage.getMessageID(), JournalStorageManager.this);
}
file.delete();
@ -475,7 +482,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
if (largeMessage.isDurable()) {
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);
long pendingRecordID = storePendingLargeMessage(id, LargeServerMessage.NO_PENDING_ID);
largeMessage.setPendingRecordID(pendingRecordID);
}

View File

@ -42,7 +42,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
private final JournalStorageManager storageManager;
private long pendingRecordID = -1;
private long pendingRecordID = NO_PENDING_ID;
private boolean paged;

View File

@ -158,4 +158,14 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes);
}
@Override
public void setPendingRecordID(long pendingRecordID) {
mainLM.setPendingRecordID(pendingRecordID);
}
@Override
public long getPendingRecordID() {
return mainLM.getPendingRecordID();
}
}

View File

@ -23,31 +23,40 @@ import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationLargeMessageEndMessage extends PacketImpl {
long messageId;
long pendingRecordId;
public ReplicationLargeMessageEndMessage() {
super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
}
public ReplicationLargeMessageEndMessage(final long messageId) {
public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) {
this();
this.messageId = messageId;
//we use negative value to indicate that this id is pre-generated by live node
//so that it won't be generated at backup.
//see https://issues.apache.org/jira/browse/ARTEMIS-1221
this.pendingRecordId = -pendingRecordId;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);
buffer.writeLong(pendingRecordId);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
messageId = buffer.readLong();
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
pendingRecordId = buffer.readLong();
}
}
/**
@ -85,4 +94,8 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
return false;
return true;
}
public long getPendingRecordId() {
return pendingRecordId;
}
}

View File

@ -52,4 +52,8 @@ public interface ReplicatedLargeMessage {
*/
void addBytes(byte[] body) throws Exception;
void setPendingRecordID(long pendingRecordID);
long getPendingRecordID();
}

View File

@ -479,6 +479,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
if (message != null) {
message.setPendingRecordID(packet.getPendingRecordId());
executor.execute(new Runnable() {
@Override
public void run() {

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
@ -241,9 +242,11 @@ public final class ReplicationManager implements ActiveMQComponent {
}
}
public void largeMessageDelete(final Long messageId) {
//we pass in storageManager to generate ID only if enabled
public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
if (enabled) {
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId));
long pendingRecordID = storageManager.generateID();
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
}
}

View File

@ -23,13 +23,11 @@ import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage {
long NO_PENDING_ID = -1;
@Override
void addBytes(byte[] bytes) throws Exception;
void setPendingRecordID(long pendingRecordID);
long getPendingRecordID();
/**
* We have to copy the large message content in case of DLQ and paged messages
* For that we need to pre-mark the LargeMessage with a flag when it is paged

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FailoverTestWithDivert extends FailoverTestBase {
private static final String DIVERT_ADDRESS = "jms.queue.testQueue";
private static final String DIVERT_FORWARD_ADDRESS = "jms.queue.divertedQueue";
private ClientSessionFactoryInternal sf;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return getNettyAcceptorTransportConfiguration(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return getNettyConnectorTransportConfiguration(live);
}
@Override
protected void createConfigs() throws Exception {
createReplicatedConfigs();
liveConfig.setJournalFileSize(10240000);
backupConfig.setJournalFileSize(10240000);
addQueue(liveConfig, DIVERT_ADDRESS, DIVERT_ADDRESS);
addQueue(liveConfig, DIVERT_FORWARD_ADDRESS, DIVERT_FORWARD_ADDRESS);
addDivert(liveConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
addDivert(backupConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
}
private void addQueue(Configuration serverConfig, String address, String name) {
List<CoreAddressConfiguration> addrConfigs = serverConfig.getAddressConfigurations();
CoreAddressConfiguration addrCfg = new CoreAddressConfiguration();
addrCfg.setName(address);
addrCfg.addRoutingType(RoutingType.ANYCAST);
CoreQueueConfiguration qConfig = new CoreQueueConfiguration();
qConfig.setName(name);
qConfig.setAddress(address);
addrCfg.addQueueConfiguration(qConfig);
addrConfigs.add(addrCfg);
}
private void addDivert(Configuration serverConfig, String source, String target, boolean exclusive) {
List<DivertConfiguration> divertConfigs = serverConfig.getDivertConfigurations();
DivertConfiguration newDivert = new DivertConfiguration();
newDivert.setName("myDivert");
newDivert.setAddress(source);
newDivert.setForwardingAddress(target);
newDivert.setExclusive(exclusive);
divertConfigs.add(newDivert);
}
@Test
public void testUniqueIDsWithDivert() throws Exception {
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
TransportConfiguration tc = createTransportConfiguration(true, false, params);
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(-1);
sf = createSessionFactoryAndWaitForTopology(locator, 2);
int minLarge = locator.getMinLargeMessageSize();
ClientSession session = sf.createSession(false, false);
addClientSession(session);
session.start();
final int num = 100;
ClientProducer producer = session.createProducer(DIVERT_ADDRESS);
for (int i = 0; i < num; i++) {
ClientMessage message = createLargeMessage(session, 2 * minLarge);
producer.send(message);
}
session.commit();
ClientConsumer consumer = session.createConsumer(DIVERT_ADDRESS);
for (int i = 0; i < num; i++) {
ClientMessage receivedFromSourceQueue = consumer.receive(5000);
assertNotNull(receivedFromSourceQueue);
receivedFromSourceQueue.acknowledge();
}
session.commit();
crash(session);
ClientConsumer consumer1 = session.createConsumer(DIVERT_FORWARD_ADDRESS);
for (int i = 0; i < num; i++) {
ClientMessage receivedFromTargetQueue = consumer1.receive(5000);
assertNotNull(receivedFromTargetQueue);
receivedFromTargetQueue.acknowledge();
}
session.commit();
}
private ClientMessage createLargeMessage(ClientSession session, final int largeSize) {
ClientMessage message = session.createMessage(true);
ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
final int propSize = 10240;
while (bodyBuffer.writerIndex() < largeSize) {
byte[] prop = new byte[propSize];
bodyBuffer.writeBytes(prop);
}
return message;
}
}

View File

@ -204,7 +204,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
public void testSendPackets() throws Exception {
setupServer(true);
StorageManager storage = getStorage();
JournalStorageManager storage = getStorage();
manager = liveServer.getReplicationManager();
waitForComponent(manager);
@ -270,7 +270,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
manager.largeMessageWrite(500, new byte[1024]);
manager.largeMessageDelete(Long.valueOf(500));
manager.largeMessageDelete(Long.valueOf(500), storage);
blockOnReplication(storage, manager);