diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 8807f4eb3e..7deebcb6e4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -438,6 +438,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager { largeMessage.setMessageID(id); + // We do this here to avoid a case where the replication gets a list without this file + // to avoid a race + largeMessage.validateFile(); + if (largeMessage.isDurable()) { // We store a marker on the journal that the large file is pending long pendingRecordID = storePendingLargeMessage(id); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 906cbd3ba9..c24924abaa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -341,7 +341,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L // Private ------------------------------------------------------- - private synchronized void validateFile() throws ActiveMQException { + public synchronized void validateFile() throws ActiveMQException { try { if (file == null) { if (messageID <= 0) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 5b6dbfb4c4..4cc937c731 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -435,7 +435,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon SequentialFile channel1; switch (msg.getFileType()) { case LARGE_MESSAGE: { - ReplicatedLargeMessage largeMessage = lookupLargeMessage(id, false); + ReplicatedLargeMessage largeMessage = lookupLargeMessage(id, false, false); if (!(largeMessage instanceof LargeServerMessageInSync)) { ActiveMQServerLogger.LOGGER.largeMessageIncompatible(); return; @@ -536,7 +536,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) { - final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true); + final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false); if (message != null) { executor.execute(new Runnable() { @Override @@ -556,13 +556,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon * @param packet */ private void handleLargeMessageWrite(final ReplicationLargeMessageWriteMessage packet) throws Exception { - ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), false); + ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), false, true); if (message != null) { message.addBytes(packet.getBody()); } } - private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean delete) { + private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean delete, final boolean createIfNotExists) { ReplicatedLargeMessage message; if (delete) { @@ -571,8 +571,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon else { message = largeMessages.get(messageId); if (message == null) { - // No warnings if it's a delete, as duplicate deletes may be sent repeatedly. - ActiveMQServerLogger.LOGGER.largeMessageNotAvailable(messageId); + if (createIfNotExists) { + createLargeMessage(messageId, false); + message = largeMessages.get(messageId); + } + else { + // No warnings if it's a delete, as duplicate deletes may be sent repeatedly. + ActiveMQServerLogger.LOGGER.largeMessageNotAvailable(messageId); + } } } diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java new file mode 100644 index 0000000000..f8d1b040e6 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.FailoverEventListener; +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** This test will add more bytes to the large message while still syncing. + * At the time of writing I couldn't replicate any issues, but I'm keeping it here to validate the impl */ +@RunWith(BMUnitRunner.class) +public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase { + + public static int messageChunkCount = 0; + + private static final ReusableLatch ruleFired = new ReusableLatch(1); + private static ActiveMQServer backupServer; + private static ActiveMQServer liveServer; + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000"); + ActiveMQConnection connection; + Session session; + Queue queue; + MessageProducer producer; + + Configuration backupConfig; + + Configuration liveConfig; + + // To inform the main thread the condition is met + static final ReusableLatch flagArrived = new ReusableLatch(1); + // To wait while the condition is worked out + static final ReusableLatch flagWait = new ReusableLatch(1); + + static final ReusableLatch flag15Arrived = new ReusableLatch(1); + // To wait while the condition is worked out + static final ReusableLatch flag15Wait = new ReusableLatch(1); + + // To inform the main thread the condition is met + static final ReusableLatch flagSyncArrived = new ReusableLatch(1); + // To wait while the condition is worked out + static final ReusableLatch flagSyncWait = new ReusableLatch(1); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + System.out.println("Tmp::" + getTemporaryDir()); + + flagArrived.setCount(1); + flagWait.setCount(1); + + flag15Arrived.setCount(1); + flag15Wait.setCount(1); + + ruleFired.setCount(1); + messageChunkCount = 0; + + TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0); + TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0); + TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0); + TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0); + + backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)). + setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)). + setLargeMessagesDirectory(getLargeMessagesDir(0, true)); + + liveConfig = createDefaultInVMConfig(); + + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor); + + liveServer = createServer(liveConfig); + liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue")); + liveServer.start(); + + waitForServerToStart(liveServer); + + // Just to make sure the expression worked + Assert.assertEquals(10000, factory.getMinLargeMessageSize()); + Assert.assertEquals(10000, factory.getProducerWindowSize()); + Assert.assertEquals(100, factory.getRetryInterval()); + Assert.assertEquals(-1, factory.getReconnectAttempts()); + Assert.assertTrue(factory.isHA()); + + connection = (ActiveMQConnection) factory.createConnection(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + queue = session.createQueue("jms.queue.Queue"); + producer = session.createProducer(queue); + + } + + private void startBackup() throws Exception { + backupServer = createServer(backupConfig); + backupServer.start(); + + waitForServerToStart(backupServer); + + } + + @After + public void stopServers() throws Exception { + if (connection != null) { + try { + connection.close(); + } + catch (Exception e) { + } + } + if (backupServer != null) { + backupServer.stop(); + backupServer = null; + } + + if (liveServer != null) { + liveServer.stop(); + liveServer = null; + } + + backupServer = liveServer = null; + } + + @Test + @BMRules( + rules = {@BMRule( + name = "InterruptSending", + targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext", + targetMethod = "sendLargeMessageChunk", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnSyncLargeMessageOverReplication2Test.messageChunkSent();"), @BMRule( + name = "InterruptSync", + targetClass = "org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager", + targetMethod = "sendLargeMessageFiles", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnSyncLargeMessageOverReplication2Test.syncLargeMessage();")}) + public void testSendLargeMessage() throws Exception { + + final CountDownLatch failedOver = new CountDownLatch(1); + connection.setFailoverListener(new FailoverEventListener() { + @Override + public void failoverEvent(FailoverEventType eventType) { + failedOver.countDown(); + } + }); + Thread t; + + { + final MapMessage message = createLargeMessage(); + + t = new Thread() { + public void run() { + try { + producer.send(message); + session.commit(); + } + catch (JMSException expected) { + expected.printStackTrace(); + } + } + }; + } + + t.start(); + + // I'm trying to simulate the following race here: + // The message is syncing while the client is already sending the body of the message + + Assert.assertTrue(flagArrived.await(10, TimeUnit.SECONDS)); + + startBackup(); + + Assert.assertTrue(flagSyncArrived.await(10, TimeUnit.SECONDS)); + + flagWait.countDown(); + + Assert.assertTrue(flag15Arrived.await(10, TimeUnit.SECONDS)); + + flag15Wait.countDown(); + + t.join(5000); + + flagSyncWait.countDown(); + + System.out.println("Thread joined"); + + Assert.assertFalse(t.isAlive()); + + waitForRemoteBackup(connection.getSessionFactory(), 30); + + + liveServer.stop(true); + + Assert.assertTrue(failedOver.await(10, TimeUnit.SECONDS)); + + { + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + MapMessage message = (MapMessage) consumer.receive(5000); + + Assert.assertNotNull(message); + + for (int i = 0; i < 10; i++) { + Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length); + } + + session.commit(); + } + } + + public static void syncLargeMessage() { + + try { + flagSyncArrived.countDown(); + flagSyncWait.await(10, TimeUnit.SECONDS); + } + catch (Exception e) { + e.printStackTrace(); + } + + } + + public static void messageChunkSent() { + messageChunkCount++; + + try { + if (messageChunkCount == 10) { + flagArrived.countDown(); + flagWait.await(10, TimeUnit.SECONDS); + } + if (messageChunkCount == 15) { + flag15Arrived.countDown(); + flag15Wait.await(10, TimeUnit.SECONDS); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + + private MapMessage createLargeMessage() throws JMSException { + MapMessage message = session.createMapMessage(); + + for (int i = 0; i < 10; i++) { + message.setBytes("test" + i, new byte[1024 * 1024]); + } + return message; + } + +} \ No newline at end of file diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplicationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplicationTest.java new file mode 100644 index 0000000000..d1bf50fd1b --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplicationTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.FailoverEventListener; +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * This test will validate the sync of large messages will not lose any messages in a specific scenario. + */ +@RunWith(BMUnitRunner.class) +public class RaceOnSyncLargeMessageOverReplicationTest extends ActiveMQTestBase { + + private static ActiveMQServer backupServer; + private static ActiveMQServer liveServer; + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000"); + ActiveMQConnection connection; + Session session; + Queue queue; + MessageProducer producer; + + Configuration backupConfig; + + Configuration liveConfig; + + // To inform the main thread the condition is met + static final ReusableLatch flagArrived = new ReusableLatch(1); + // To wait while the condition is worked out + static final ReusableLatch flagWait = new ReusableLatch(1); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + System.out.println("Tmp::" + getTemporaryDir()); + + flagArrived.setCount(1); + flagWait.setCount(1); + + TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0); + TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0); + TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0); + TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0); + + backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)). + setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)). + setLargeMessagesDirectory(getLargeMessagesDir(0, true)); + + liveConfig = createDefaultInVMConfig(); + + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor); + + liveServer = createServer(liveConfig); + liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue")); + liveServer.start(); + + waitForServerToStart(liveServer); + + // Just to make sure the expression worked + Assert.assertEquals(10000, factory.getMinLargeMessageSize()); + Assert.assertEquals(10000, factory.getProducerWindowSize()); + Assert.assertEquals(100, factory.getRetryInterval()); + Assert.assertEquals(-1, factory.getReconnectAttempts()); + Assert.assertTrue(factory.isHA()); + + connection = (ActiveMQConnection) factory.createConnection(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + queue = session.createQueue("jms.queue.Queue"); + producer = session.createProducer(queue); + + } + + private void startBackup() throws Exception { + backupServer = createServer(backupConfig); + backupServer.start(); + + waitForServerToStart(backupServer); + + } + + @After + public void stopServers() throws Exception { + if (connection != null) { + try { + connection.close(); + } + catch (Exception e) { + } + } + if (backupServer != null) { + backupServer.stop(); + backupServer = null; + } + + if (liveServer != null) { + liveServer.stop(); + liveServer = null; + } + + backupServer = liveServer = null; + } + + /* + * simple test to induce a potential race condition where the server's acceptors are active, but the server's + * state != STARTED + */ + @Test + @BMRules( + rules = {@BMRule( + name = "InterruptSync", + targetClass = "org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager", + targetMethod = "createLargeMessage(long,org.apache.activemq.artemis.core.message.impl.MessageInternal)", + targetLocation = "EXIT", + action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnSyncLargeMessageOverReplicationTest.syncLargeMessage();")}) + public void testSendLargeMessage() throws Exception { + + final CountDownLatch failedOver = new CountDownLatch(1); + connection.setFailoverListener(new FailoverEventListener() { + @Override + public void failoverEvent(FailoverEventType eventType) { + failedOver.countDown(); + } + }); + Thread t; + + { + final MapMessage message = createLargeMessage(); + + t = new Thread() { + public void run() { + try { + producer.send(message); + session.commit(); + } + catch (JMSException expected) { + expected.printStackTrace(); + } + } + }; + } + + t.start(); + + // I'm trying to simulate the following race here: + // The message is syncing while the client is already sending the body of the message + + Assert.assertTrue(flagArrived.await(10, TimeUnit.SECONDS)); + + startBackup(); + + waitForRemoteBackup(connection.getSessionFactory(), 30); + + flagWait.countDown(); + + t.join(5000); + + System.out.println("Thread joined"); + + Assert.assertFalse(t.isAlive()); + + + liveServer.stop(true); + + Assert.assertTrue(failedOver.await(10, TimeUnit.SECONDS)); + + { + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + MapMessage message = (MapMessage) consumer.receive(5000); + + Assert.assertNotNull(message); + + for (int i = 0; i < 10; i++) { + Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length); + } + + session.commit(); + } + } + + public static void syncLargeMessage() { + try { + flagArrived.countDown(); + flagWait.await(10, TimeUnit.SECONDS); + } + catch (Exception e) { + e.printStackTrace(); + } + + } + + private MapMessage createLargeMessage() throws JMSException { + MapMessage message = session.createMapMessage(); + + for (int i = 0; i < 10; i++) { + message.setBytes("test" + i, new byte[1024 * 1024]); + } + return message; + } + +} \ No newline at end of file