diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index 25c4e28e5e..3153a5088c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -355,7 +355,11 @@ class DataFileAppender implements FileAppender { synchronized (enqueueMutex) { running = false; signalError(wb, error); - signalError(nextWriteBatch, error); + if (nextWriteBatch != null) { + signalError(nextWriteBatch, error); + nextWriteBatch = null; + enqueueMutex.notifyAll(); + } } } finally { try { @@ -402,12 +406,23 @@ class DataFileAppender implements FileAppender { if (wb != null) { if (t instanceof IOException) { wb.exception.set((IOException) t); - // revert batch increment such that next write is contiguous - wb.dataFile.decrementLength(wb.size); + // revert sync batch increment such that next write is contiguous + if (syncBatch(wb.writes)) { + wb.dataFile.decrementLength(wb.size); + } } else { wb.exception.set(IOExceptionSupport.create(t)); } signalDone(wb); } } + + // async writes will already be in the index so reuse is not an option + private boolean syncBatch(LinkedNodeList writes) { + Journal.WriteCommand write = writes.getHead(); + while (write != null && write.sync) { + write = write.getNext(); + } + return write == null; + } } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java index aa6df3f28a..ec68d13526 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java @@ -21,17 +21,27 @@ import org.apache.activemq.util.RecoverableRandomAccessFile; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class DataFileAppenderNoSpaceNoBatchTest { + + private static final Logger LOG = LoggerFactory.getLogger(DataFileAppenderNoSpaceNoBatchTest.class); + @Rule public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); @@ -77,4 +87,62 @@ public class DataFileAppenderNoSpaceNoBatchTest { assertEquals("offset is reused", seekPositions.get(0), seekPositions.get(1)); } + + + @Test(timeout = 10000) + public void testNoSpaceNextWriteSameBatchAsync() throws Exception { + final List seekPositions = Collections.synchronizedList(new ArrayList()); + + final DataFile currentDataFile = new DataFile(dataFileDir.newFile(), 0) { + public RecoverableRandomAccessFile appendRandomAccessFile() throws IOException { + + return new RecoverableRandomAccessFile(dataFileDir.newFile(), "rw") { + + public void seek(long pos) throws IOException { + seekPositions.add(pos); + } + + public void write(byte[] bytes, int offset, int len) throws IOException { + if (seekPositions.size() == 2) { + throw new IOException("No space on device: " + seekPositions.size()); + } + } + }; + }; + }; + + underTest = new DataFileAppender(new Journal() { + @Override + public DataFile getCurrentDataFile(int capacity) throws IOException { + return currentDataFile; + }; + + @Override + public int getWriteBatchSize() { + // force multiple async batches + return 4*1024; + } + }); + + final ByteSequence byteSequence = new ByteSequence(new byte[1024]); + + ConcurrentLinkedQueue locations = new ConcurrentLinkedQueue(); + HashSet latches = new HashSet(); + for (int i = 0; i <= 20; i++) { + Location location = underTest.storeItem(byteSequence, (byte) 1, false); + locations.add(location); + latches.add(location.getLatch()); + } + + for (CountDownLatch latch: latches) { + assertTrue("write complete", latch.await(5, TimeUnit.SECONDS)); + } + + LOG.info("Latches count: " + latches.size()); + LOG.info("Seeks: " + seekPositions); + + assertTrue("got more than on latch: " + latches.size(), latches.size() > 1); + assertTrue("got seeks: " + seekPositions, seekPositions.size() > 2); + assertEquals("no duplicates: " + seekPositions, seekPositions.size(), new HashSet(seekPositions).size()); + } }