From 7de7ba2aa92dd1a98f48175fac5a538bd6e8579b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Janczykowski?= Date: Tue, 17 Jan 2023 12:20:53 +0100 Subject: [PATCH 1/2] [AMQ-9199] Fixed race condition in creating store directory A store directory is created by MessageDatabase#getPageFile which is called in two cases: 1. KahaDBStore.start() when creating a queue 2. KahaDBStore.size() which is performed when sending any persistent message If both methods are called concurrently it's possible to get an IOException thrown from the IOHelper.mkdirs method. --- .../org/apache/activemq/util/IOHelper.java | 4 ++ .../store/kahadb/MessageDatabaseTest.java | 58 ++++++++++++++++--- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java index fb0784c950..a18179dd3f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java @@ -328,6 +328,10 @@ public final class IOHelper { } else { if (!dir.mkdirs()) { + if ( dir.exists() && dir.isDirectory() ) { + // Directory created in parallel + return; + } throw new IOException("Failed to create directory '" + dir + "'"); } } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java index 604b46f996..01045f5d31 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java @@ -17,6 +17,18 @@ package org.apache.activemq.store.kahadb; +import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.kahadb.disk.journal.Journal; @@ -25,14 +37,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; -import static org.junit.Assert.*; - public class MessageDatabaseTest { @Rule @@ -114,4 +118,42 @@ public class MessageDatabaseTest { kaha.stop(); } } + + @Test + public void testKahaStartAndSizeCreatingStoreDirectoryConcurrently() throws Exception { + // given mkdirs() will execute in parallel + final CountDownLatch countDownLatch = new CountDownLatch(2); + + class ConcurrentMkdirsFile extends File { + + public ConcurrentMkdirsFile(File parent, String child) { + super(parent, child); + } + + @Override + public boolean mkdirs() { + countDownLatch.countDown(); + try { + countDownLatch.await(3, TimeUnit.SECONDS); + return super.mkdirs(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + // and KahaDBStore is configured + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new ConcurrentMkdirsFile(temporaryFolder.getRoot(), "kaha3")); + + // when both start() and size() are performed concurrently + final Future size = Executors.newSingleThreadExecutor() + .submit(kaha::size); + kaha.start(); + + // then KahaDB should successfully start + assertTrue(kaha.isStarted()); + assertTrue(size.get() > 0); + } + } \ No newline at end of file From 13ec5d5b7ace619e84ff2982da9e80091bb64e66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Janczykowski?= Date: Tue, 17 Jan 2023 12:26:21 +0100 Subject: [PATCH 2/2] [AMQ-9199] reordered imports --- .../store/kahadb/MessageDatabaseTest.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java index 01045f5d31..a2133747c1 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java @@ -17,10 +17,13 @@ package org.apache.activemq.store.kahadb; -import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.activemq.ActiveMQMessageAuditNoSync; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.util.ByteSequence; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; @@ -29,13 +32,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.activemq.ActiveMQMessageAuditNoSync; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.disk.journal.Journal; -import org.apache.activemq.util.ByteSequence; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; +import static org.junit.Assert.*; public class MessageDatabaseTest {