diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index bf682ff7a1..5ea104bca3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1546,7 +1546,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (largeServerMessage.getPendingRecordID() >= 0) { try { confirmPendingLargeMessage(largeServerMessage.getPendingRecordID()); - largeServerMessage.setPendingRecordID(-1); + largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID); } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index e79a9cbccb..ca1b805e34 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -272,7 +272,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId); } if (replicator != null) { - replicator.largeMessageDelete(largeMsgId); + replicator.largeMessageDelete(largeMsgId, JournalStorageManager.this); } } largeMessagesToDelete.clear(); @@ -375,10 +375,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager { journalFF.releaseBuffer(buffer); } - public long storePendingLargeMessage(final long messageID) throws Exception { + public long storePendingLargeMessage(final long messageID, long recordID) throws Exception { readLock(); try { - long recordID = generateID(); + if (recordID == LargeServerMessage.NO_PENDING_ID) { + recordID = generateID(); + } else { + //this means the large message doesn't + //have a pendingRecordID, but one has been + //generated (coming from live server) for use. + recordID = -recordID; + } messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true)); @@ -396,7 +403,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // And the client won't be waiting for the actual file to be deleted. // We set a temporary record (short lived) on the journal // to avoid a situation where the server is restarted and pending large message stays on forever - largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID())); + largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID())); } catch (Exception e) { throw new ActiveMQInternalErrorException(e.getMessage(), e); } @@ -427,7 +434,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { readLock(); try { if (replicator != null) { - replicator.largeMessageDelete(largeServerMessage.getMessageID()); + replicator.largeMessageDelete(largeServerMessage.getMessageID(), JournalStorageManager.this); } file.delete(); @@ -475,7 +482,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { if (largeMessage.isDurable()) { // We store a marker on the journal that the large file is pending - long pendingRecordID = storePendingLargeMessage(id); + long pendingRecordID = storePendingLargeMessage(id, LargeServerMessage.NO_PENDING_ID); largeMessage.setPendingRecordID(pendingRecordID); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 22929e7384..22cfa0b44b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -44,7 +44,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L private final JournalStorageManager storageManager; - private long pendingRecordID = -1; + private long pendingRecordID = NO_PENDING_ID; private boolean paged; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java index 42126d44b6..66ccd8c2bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java @@ -158,4 +158,14 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes); } + @Override + public void setPendingRecordID(long pendingRecordID) { + mainLM.setPendingRecordID(pendingRecordID); + } + + @Override + public long getPendingRecordID() { + return mainLM.getPendingRecordID(); + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java index 4a09cc034c..a9be86a4cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java @@ -23,31 +23,40 @@ import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationLargeMessageEndMessage extends PacketImpl { long messageId; + long pendingRecordId; public ReplicationLargeMessageEndMessage() { super(PacketImpl.REPLICATION_LARGE_MESSAGE_END); } - public ReplicationLargeMessageEndMessage(final long messageId) { + public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) { this(); this.messageId = messageId; + //we use negative value to indicate that this id is pre-generated by live node + //so that it won't be generated at backup. + //see https://issues.apache.org/jira/browse/ARTEMIS-1221 + this.pendingRecordId = -pendingRecordId; } - @Override public int expectedEncodeSize() { return PACKET_HEADERS_SIZE + - DataConstants.SIZE_LONG; // buffer.writeLong(messageId); + DataConstants.SIZE_LONG + // buffer.writeLong(messageId) + DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId); } @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(messageId); + buffer.writeLong(pendingRecordId); } @Override public void decodeRest(final ActiveMQBuffer buffer) { messageId = buffer.readLong(); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + pendingRecordId = buffer.readLong(); + } } /** @@ -85,4 +94,8 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { return false; return true; } + + public long getPendingRecordId() { + return pendingRecordId; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java index 3b6327a89e..b744805a9d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java @@ -52,4 +52,8 @@ public interface ReplicatedLargeMessage { */ void addBytes(byte[] body) throws Exception; + void setPendingRecordID(long pendingRecordID); + + long getPendingRecordID(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 5a01bf7212..d6f807ce17 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -519,6 +519,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false); if (message != null) { + message.setPendingRecordID(packet.getPendingRecordId()); executor.execute(new Runnable() { @Override public void run() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index d8d70f0f0e..e1027d48c9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; @@ -238,9 +239,11 @@ public final class ReplicationManager implements ActiveMQComponent { } } - public void largeMessageDelete(final Long messageId) { + //we pass in storageManager to generate ID only if enabled + public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) { if (enabled) { - sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId)); + long pendingRecordID = storageManager.generateID(); + sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID)); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index 2a16ed258b..38f36ad24d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -22,13 +22,11 @@ import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage { + long NO_PENDING_ID = -1; + @Override void addBytes(byte[] bytes) throws Exception; - void setPendingRecordID(long pendingRecordID); - - long getPendingRecordID(); - /** * We have to copy the large message content in case of DLQ and paged messages * For that we need to pre-mark the LargeMessage with a flag when it is paged diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java new file mode 100644 index 0000000000..76efc22642 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FailoverTestWithDivert extends FailoverTestBase { + + private static final String DIVERT_ADDRESS = "jms.queue.testQueue"; + private static final String DIVERT_FORWARD_ADDRESS = "jms.queue.divertedQueue"; + private ClientSessionFactoryInternal sf; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return getNettyAcceptorTransportConfiguration(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return getNettyConnectorTransportConfiguration(live); + } + + @Override + protected void createConfigs() throws Exception { + createReplicatedConfigs(); + + liveConfig.setJournalFileSize(10240000); + backupConfig.setJournalFileSize(10240000); + addQueue(liveConfig, DIVERT_ADDRESS, DIVERT_ADDRESS); + addQueue(liveConfig, DIVERT_FORWARD_ADDRESS, DIVERT_FORWARD_ADDRESS); + addDivert(liveConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false); + addDivert(backupConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false); + } + + private void addQueue(Configuration serverConfig, String address, String name) { + + List addrConfigs = serverConfig.getAddressConfigurations(); + CoreAddressConfiguration addrCfg = new CoreAddressConfiguration(); + addrCfg.setName(address); + addrCfg.addRoutingType(RoutingType.ANYCAST); + CoreQueueConfiguration qConfig = new CoreQueueConfiguration(); + qConfig.setName(name); + qConfig.setAddress(address); + addrCfg.addQueueConfiguration(qConfig); + addrConfigs.add(addrCfg); + } + + private void addDivert(Configuration serverConfig, String source, String target, boolean exclusive) { + List divertConfigs = serverConfig.getDivertConfigurations(); + DivertConfiguration newDivert = new DivertConfiguration(); + newDivert.setName("myDivert"); + newDivert.setAddress(source); + newDivert.setForwardingAddress(target); + newDivert.setExclusive(exclusive); + divertConfigs.add(newDivert); + } + + @Test + public void testUniqueIDsWithDivert() throws Exception { + Map params = new HashMap<>(); + params.put(TransportConstants.HOST_PROP_NAME, "localhost"); + TransportConfiguration tc = createTransportConfiguration(true, false, params); + ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(-1); + sf = createSessionFactoryAndWaitForTopology(locator, 2); + int minLarge = locator.getMinLargeMessageSize(); + + ClientSession session = sf.createSession(false, false); + addClientSession(session); + session.start(); + + final int num = 100; + ClientProducer producer = session.createProducer(DIVERT_ADDRESS); + for (int i = 0; i < num; i++) { + ClientMessage message = createLargeMessage(session, 2 * minLarge); + producer.send(message); + } + session.commit(); + + ClientConsumer consumer = session.createConsumer(DIVERT_ADDRESS); + for (int i = 0; i < num; i++) { + ClientMessage receivedFromSourceQueue = consumer.receive(5000); + assertNotNull(receivedFromSourceQueue); + receivedFromSourceQueue.acknowledge(); + } + session.commit(); + + crash(session); + + ClientConsumer consumer1 = session.createConsumer(DIVERT_FORWARD_ADDRESS); + for (int i = 0; i < num; i++) { + ClientMessage receivedFromTargetQueue = consumer1.receive(5000); + assertNotNull(receivedFromTargetQueue); + receivedFromTargetQueue.acknowledge(); + } + session.commit(); + } + + private ClientMessage createLargeMessage(ClientSession session, final int largeSize) { + ClientMessage message = session.createMessage(true); + ActiveMQBuffer bodyBuffer = message.getBodyBuffer(); + final int propSize = 10240; + while (bodyBuffer.writerIndex() < largeSize) { + byte[] prop = new byte[propSize]; + bodyBuffer.writeBytes(prop); + } + return message; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 1ae9527d1b..398e895c08 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -204,7 +204,7 @@ public final class ReplicationTest extends ActiveMQTestBase { public void testSendPackets() throws Exception { setupServer(true); - StorageManager storage = getStorage(); + JournalStorageManager storage = getStorage(); manager = liveServer.getReplicationManager(); waitForComponent(manager); @@ -270,7 +270,7 @@ public final class ReplicationTest extends ActiveMQTestBase { manager.largeMessageWrite(500, new byte[1024]); - manager.largeMessageDelete(Long.valueOf(500)); + manager.largeMessageDelete(Long.valueOf(500), storage); blockOnReplication(storage, manager);