From 22d5b51a0c69b48665f32ea19bad5046d7237426 Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 21 Feb 2017 17:03:46 +0000 Subject: [PATCH] [AMQ-6606] avoid partial writes to the end of the journal - revert offset increment on ioexception, fix and test (cherry picked from commit d53b8f8d424e3cf51646b215007fc017717edf44) --- .../store/kahadb/MessageDatabase.java | 2 +- .../store/kahadb/disk/journal/DataFile.java | 4 + .../kahadb/disk/journal/DataFileAppender.java | 41 +++++----- .../DataFileAppenderNoSpaceNoBatchTest.java | 80 +++++++++++++++++++ 4 files changed, 106 insertions(+), 21 deletions(-) create mode 100644 activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 2db07f1d61..5ee6c4cb74 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1132,7 +1132,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } return location; } catch (IOException ioe) { - LOG.error("KahaDB failed to store to Journal", ioe); + LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); brokerService.handleIOException(ioe); throw ioe; } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java index 5b96adfc43..1532f08661 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java @@ -72,6 +72,10 @@ public class DataFile extends LinkedNode implements Comparable seekPositions = Collections.synchronizedList(new ArrayList()); + + final DataFile currentDataFile = new DataFile(dataFileDir.newFile(), 0) { + public RecoverableRandomAccessFile appendRandomAccessFile() throws IOException { + + return new RecoverableRandomAccessFile(dataFileDir.newFile(), "rw") { + + public void seek(long pos) throws IOException { + seekPositions.add(pos); + } + + public void write(byte[] bytes, int offset, int len) throws IOException { + throw new IOException("No space on device"); + } + }; + }; + }; + + underTest = new DataFileAppender(new Journal() { + @Override + public DataFile getCurrentDataFile(int capacity) throws IOException { + return currentDataFile; + }; + }); + + final ByteSequence byteSequence = new ByteSequence(new byte[4*1024]); + for (int i=0; i<2; i++) { + try { + underTest.storeItem(byteSequence, (byte) 1, true); + fail("expect no space"); + } catch (IOException expected) { + } + } + + assertEquals("got 2 seeks: " + seekPositions, 2, seekPositions.size()); + assertEquals("offset is reused", seekPositions.get(0), seekPositions.get(1)); + + } +}