mirror of https://github.com/apache/activemq.git
[AMQ-9199] Fixed race condition in creating store directory
A store directory is created by MessageDatabase#getPageFile which
is called in two cases:
1. KahaDBStore.start() when creating a queue
2. KahaDBStore.size() which is performed when sending any persistent message
If both methods are called concurrently it's possible to get an IOException
thrown from the IOHelper.mkdirs method.
(cherry picked from commit 7de7ba2aa9
)
This commit is contained in:
parent
ce7782a06f
commit
c5ecf53bdc
|
@ -328,6 +328,10 @@ public final class IOHelper {
|
|||
|
||||
} else {
|
||||
if (!dir.mkdirs()) {
|
||||
if ( dir.exists() && dir.isDirectory() ) {
|
||||
// Directory created in parallel
|
||||
return;
|
||||
}
|
||||
throw new IOException("Failed to create directory '" + dir + "'");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,18 @@
|
|||
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.ActiveMQMessageAuditNoSync;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.kahadb.disk.journal.Journal;
|
||||
|
@ -25,14 +37,6 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class MessageDatabaseTest {
|
||||
|
||||
@Rule
|
||||
|
@ -114,4 +118,42 @@ public class MessageDatabaseTest {
|
|||
kaha.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKahaStartAndSizeCreatingStoreDirectoryConcurrently() throws Exception {
|
||||
// given mkdirs() will execute in parallel
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(2);
|
||||
|
||||
class ConcurrentMkdirsFile extends File {
|
||||
|
||||
public ConcurrentMkdirsFile(File parent, String child) {
|
||||
super(parent, child);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs() {
|
||||
countDownLatch.countDown();
|
||||
try {
|
||||
countDownLatch.await(3, TimeUnit.SECONDS);
|
||||
return super.mkdirs();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// and KahaDBStore is configured
|
||||
KahaDBStore kaha = new KahaDBStore();
|
||||
kaha.setDirectory(new ConcurrentMkdirsFile(temporaryFolder.getRoot(), "kaha3"));
|
||||
|
||||
// when both start() and size() are performed concurrently
|
||||
final Future<Long> size = Executors.newSingleThreadExecutor()
|
||||
.submit(kaha::size);
|
||||
kaha.start();
|
||||
|
||||
// then KahaDB should successfully start
|
||||
assertTrue(kaha.isStarted());
|
||||
assertTrue(size.get() > 0);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue