From d50f577cd50df37634f592db65200861fe3e13d3 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Thu, 29 Jun 2017 00:03:47 +0800 Subject: [PATCH] ARTEMIS-1221 Duplicated ID causes LargeMessage lost at backup When a large message is replicated to backup, a pendingID is generated when the large message is finished. This pendingID is generated by a BatchingIDGenerator at backup. It is possible that a pendingID generated at backup may be a duplicate to an ID generated at live server. This can cause a problem when a large message with a messageID that is the same as another largemessage's pendingID is replicated and stored in the backup's journal, and then a deleteRecord for the pendingID is appended. If backup becomes live and loads the journal, it will drop the large message add record because there is a deleteRecord of the same ID (even though it is a pendingID of another message). As a result the expecting client will never get this large message. So in summary, the root cause is that the pendingIDs for large messages are generated at backup while backup is not alive. The solution to this is that instead of the backup generating the pendingID, we make them all be generated in advance at live server and let them replicated to backup whereever needed. The ID generater at backup only works when backup becomes live (when it is properly initialized from journal). --- .../AbstractJournalStorageManager.java | 2 +- .../impl/journal/JournalStorageManager.java | 19 ++- .../impl/journal/LargeServerMessageImpl.java | 2 +- .../journal/LargeServerMessageInSync.java | 10 ++ .../ReplicationLargeMessageEndMessage.java | 19 ++- .../replication/ReplicatedLargeMessage.java | 4 + .../core/replication/ReplicationEndpoint.java | 1 + .../core/replication/ReplicationManager.java | 7 +- .../core/server/LargeServerMessage.java | 6 +- .../failover/FailoverTestWithDivert.java | 148 ++++++++++++++++++ .../replication/ReplicationTest.java | 4 +- 11 files changed, 203 insertions(+), 19 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 4ba8e82be1..0eb1dc312b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1565,7 +1565,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (largeServerMessage.getPendingRecordID() >= 0) { try { confirmPendingLargeMessage(largeServerMessage.getPendingRecordID()); - largeServerMessage.setPendingRecordID(-1); + largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID); } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 803c441405..ba7bb867e8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -274,7 +274,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId); } if (replicator != null) { - replicator.largeMessageDelete(largeMsgId); + replicator.largeMessageDelete(largeMsgId, JournalStorageManager.this); } } largeMessagesToDelete.clear(); @@ -375,10 +375,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager { journalFF.releaseBuffer(buffer); } - public long storePendingLargeMessage(final long messageID) throws Exception { + public long storePendingLargeMessage(final long messageID, long recordID) throws Exception { readLock(); try { - long recordID = generateID(); + if (recordID == LargeServerMessage.NO_PENDING_ID) { + recordID = generateID(); + } else { + //this means the large message doesn't + //have a pendingRecordID, but one has been + //generated (coming from live server) for use. + recordID = -recordID; + } messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true)); @@ -396,7 +403,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // And the client won't be waiting for the actual file to be deleted. // We set a temporary record (short lived) on the journal // to avoid a situation where the server is restarted and pending large message stays on forever - largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID())); + largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID())); } catch (Exception e) { throw new ActiveMQInternalErrorException(e.getMessage(), e); } @@ -427,7 +434,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { readLock(); try { if (replicator != null) { - replicator.largeMessageDelete(largeServerMessage.getMessageID()); + replicator.largeMessageDelete(largeServerMessage.getMessageID(), JournalStorageManager.this); } file.delete(); @@ -475,7 +482,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { if (largeMessage.isDurable()) { // We store a marker on the journal that the large file is pending - long pendingRecordID = storePendingLargeMessage(id); + long pendingRecordID = storePendingLargeMessage(id, LargeServerMessage.NO_PENDING_ID); largeMessage.setPendingRecordID(pendingRecordID); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 79c1a85b9b..b8236af4f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -42,7 +42,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe private final JournalStorageManager storageManager; - private long pendingRecordID = -1; + private long pendingRecordID = NO_PENDING_ID; private boolean paged; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java index 42126d44b6..66ccd8c2bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java @@ -158,4 +158,14 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes); } + @Override + public void setPendingRecordID(long pendingRecordID) { + mainLM.setPendingRecordID(pendingRecordID); + } + + @Override + public long getPendingRecordID() { + return mainLM.getPendingRecordID(); + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java index 4a09cc034c..a9be86a4cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java @@ -23,31 +23,40 @@ import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationLargeMessageEndMessage extends PacketImpl { long messageId; + long pendingRecordId; public ReplicationLargeMessageEndMessage() { super(PacketImpl.REPLICATION_LARGE_MESSAGE_END); } - public ReplicationLargeMessageEndMessage(final long messageId) { + public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) { this(); this.messageId = messageId; + //we use negative value to indicate that this id is pre-generated by live node + //so that it won't be generated at backup. + //see https://issues.apache.org/jira/browse/ARTEMIS-1221 + this.pendingRecordId = -pendingRecordId; } - @Override public int expectedEncodeSize() { return PACKET_HEADERS_SIZE + - DataConstants.SIZE_LONG; // buffer.writeLong(messageId); + DataConstants.SIZE_LONG + // buffer.writeLong(messageId) + DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId); } @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(messageId); + buffer.writeLong(pendingRecordId); } @Override public void decodeRest(final ActiveMQBuffer buffer) { messageId = buffer.readLong(); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + pendingRecordId = buffer.readLong(); + } } /** @@ -85,4 +94,8 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { return false; return true; } + + public long getPendingRecordId() { + return pendingRecordId; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java index 3b6327a89e..b744805a9d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java @@ -52,4 +52,8 @@ public interface ReplicatedLargeMessage { */ void addBytes(byte[] body) throws Exception; + void setPendingRecordID(long pendingRecordID); + + long getPendingRecordID(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 6f899f33cc..f879aeb434 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -479,6 +479,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false); if (message != null) { + message.setPendingRecordID(packet.getPendingRecordId()); executor.execute(new Runnable() { @Override public void run() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 8a624ceaf6..398f4527f5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; @@ -241,9 +242,11 @@ public final class ReplicationManager implements ActiveMQComponent { } } - public void largeMessageDelete(final Long messageId) { + //we pass in storageManager to generate ID only if enabled + public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) { if (enabled) { - sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId)); + long pendingRecordID = storageManager.generateID(); + sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID)); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index 6fcc802ee0..a80e3692b3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -23,13 +23,11 @@ import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage { + long NO_PENDING_ID = -1; + @Override void addBytes(byte[] bytes) throws Exception; - void setPendingRecordID(long pendingRecordID); - - long getPendingRecordID(); - /** * We have to copy the large message content in case of DLQ and paged messages * For that we need to pre-mark the LargeMessage with a flag when it is paged diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java new file mode 100644 index 0000000000..76efc22642 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FailoverTestWithDivert extends FailoverTestBase { + + private static final String DIVERT_ADDRESS = "jms.queue.testQueue"; + private static final String DIVERT_FORWARD_ADDRESS = "jms.queue.divertedQueue"; + private ClientSessionFactoryInternal sf; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return getNettyAcceptorTransportConfiguration(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return getNettyConnectorTransportConfiguration(live); + } + + @Override + protected void createConfigs() throws Exception { + createReplicatedConfigs(); + + liveConfig.setJournalFileSize(10240000); + backupConfig.setJournalFileSize(10240000); + addQueue(liveConfig, DIVERT_ADDRESS, DIVERT_ADDRESS); + addQueue(liveConfig, DIVERT_FORWARD_ADDRESS, DIVERT_FORWARD_ADDRESS); + addDivert(liveConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false); + addDivert(backupConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false); + } + + private void addQueue(Configuration serverConfig, String address, String name) { + + List addrConfigs = serverConfig.getAddressConfigurations(); + CoreAddressConfiguration addrCfg = new CoreAddressConfiguration(); + addrCfg.setName(address); + addrCfg.addRoutingType(RoutingType.ANYCAST); + CoreQueueConfiguration qConfig = new CoreQueueConfiguration(); + qConfig.setName(name); + qConfig.setAddress(address); + addrCfg.addQueueConfiguration(qConfig); + addrConfigs.add(addrCfg); + } + + private void addDivert(Configuration serverConfig, String source, String target, boolean exclusive) { + List divertConfigs = serverConfig.getDivertConfigurations(); + DivertConfiguration newDivert = new DivertConfiguration(); + newDivert.setName("myDivert"); + newDivert.setAddress(source); + newDivert.setForwardingAddress(target); + newDivert.setExclusive(exclusive); + divertConfigs.add(newDivert); + } + + @Test + public void testUniqueIDsWithDivert() throws Exception { + Map params = new HashMap<>(); + params.put(TransportConstants.HOST_PROP_NAME, "localhost"); + TransportConfiguration tc = createTransportConfiguration(true, false, params); + ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(-1); + sf = createSessionFactoryAndWaitForTopology(locator, 2); + int minLarge = locator.getMinLargeMessageSize(); + + ClientSession session = sf.createSession(false, false); + addClientSession(session); + session.start(); + + final int num = 100; + ClientProducer producer = session.createProducer(DIVERT_ADDRESS); + for (int i = 0; i < num; i++) { + ClientMessage message = createLargeMessage(session, 2 * minLarge); + producer.send(message); + } + session.commit(); + + ClientConsumer consumer = session.createConsumer(DIVERT_ADDRESS); + for (int i = 0; i < num; i++) { + ClientMessage receivedFromSourceQueue = consumer.receive(5000); + assertNotNull(receivedFromSourceQueue); + receivedFromSourceQueue.acknowledge(); + } + session.commit(); + + crash(session); + + ClientConsumer consumer1 = session.createConsumer(DIVERT_FORWARD_ADDRESS); + for (int i = 0; i < num; i++) { + ClientMessage receivedFromTargetQueue = consumer1.receive(5000); + assertNotNull(receivedFromTargetQueue); + receivedFromTargetQueue.acknowledge(); + } + session.commit(); + } + + private ClientMessage createLargeMessage(ClientSession session, final int largeSize) { + ClientMessage message = session.createMessage(true); + ActiveMQBuffer bodyBuffer = message.getBodyBuffer(); + final int propSize = 10240; + while (bodyBuffer.writerIndex() < largeSize) { + byte[] prop = new byte[propSize]; + bodyBuffer.writeBytes(prop); + } + return message; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index ab32517911..d67b980dee 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -204,7 +204,7 @@ public final class ReplicationTest extends ActiveMQTestBase { public void testSendPackets() throws Exception { setupServer(true); - StorageManager storage = getStorage(); + JournalStorageManager storage = getStorage(); manager = liveServer.getReplicationManager(); waitForComponent(manager); @@ -270,7 +270,7 @@ public final class ReplicationTest extends ActiveMQTestBase { manager.largeMessageWrite(500, new byte[1024]); - manager.largeMessageDelete(Long.valueOf(500)); + manager.largeMessageDelete(Long.valueOf(500), storage); blockOnReplication(storage, manager);