diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 9d24b4daa90..d4bc811c72c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -295,7 +295,7 @@ public class BufferedMutatorImpl implements BufferedMutator { break; } AsyncRequestFuture asf; - try (QueueRowAccess access = new QueueRowAccess()) { + try (QueueRowAccess access = createQueueRowAccess()) { if (access.isEmpty()) { // It means someone has gotten the ticker to run the flush. break; @@ -406,16 +406,46 @@ public class BufferedMutatorImpl implements BufferedMutator { return currentWriteBufferSize.get(); } + /** + * Count the mutations which haven't been processed. + * @return count of undealt mutation + */ @VisibleForTesting int size() { return undealtMutationCount.get(); } - private class QueueRowAccess implements RowAccess, Closeable { + /** + * Count the mutations which haven't been flushed + * @return count of unflushed mutation + */ + @VisibleForTesting + int getUnflushedSize() { + return writeAsyncBuffer.size(); + } + + @VisibleForTesting + QueueRowAccess createQueueRowAccess() { + return new QueueRowAccess(); + } + + @VisibleForTesting + class QueueRowAccess implements RowAccess, Closeable { private int remainder = undealtMutationCount.getAndSet(0); + private Mutation last = null; + + private void restoreLastMutation() { + // restore the last mutation since it isn't submitted + if (last != null) { + writeAsyncBuffer.add(last); + currentWriteBufferSize.addAndGet(last.heapSize()); + last = null; + } + } @Override public void close() { + restoreLastMutation(); if (remainder > 0) { undealtMutationCount.addAndGet(remainder); remainder = 0; @@ -425,25 +455,22 @@ public class BufferedMutatorImpl implements BufferedMutator { @Override public Iterator iterator() { return new Iterator() { - private final Iterator iter = writeAsyncBuffer.iterator(); private int countDown = remainder; - private Mutation last = null; @Override public boolean hasNext() { - if (countDown <= 0) { - return false; - } - return iter.hasNext(); + return countDown > 0; } @Override public Row next() { + restoreLastMutation(); if (!hasNext()) { throw new NoSuchElementException(); } - last = iter.next(); + last = writeAsyncBuffer.poll(); if (last == null) { throw new NoSuchElementException(); } + currentWriteBufferSize.addAndGet(-last.heapSize()); --countDown; return last; } @@ -452,8 +479,6 @@ public class BufferedMutatorImpl implements BufferedMutator { if (last == null) { throw new IllegalStateException(); } - iter.remove(); - currentWriteBufferSize.addAndGet(-last.heapSize()); --remainder; last = null; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 2979dcd5e35..4a2ed8d7473 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -1795,4 +1795,48 @@ public class TestAsyncProcess { LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); } + + @Test + public void testQueueRowAccess() throws Exception { + ClusterConnection conn = createHConnection(); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, + new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000)); + Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); + Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); + mutator.mutate(p0); + BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess(); + // QueueRowAccess should take all undealt mutations + assertEquals(0, mutator.size()); + mutator.mutate(p1); + assertEquals(1, mutator.size()); + BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess(); + // QueueRowAccess should take all undealt mutations + assertEquals(0, mutator.size()); + assertEquals(1, ra0.size()); + assertEquals(1, ra1.size()); + Iterator iter0 = ra0.iterator(); + Iterator iter1 = ra1.iterator(); + assertTrue(iter0.hasNext()); + assertTrue(iter1.hasNext()); + // the next() will poll the mutation from inner buffer and update the buffer count + assertTrue(iter0.next() == p0); + assertEquals(1, mutator.getUnflushedSize()); + assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); + assertTrue(iter1.next() == p1); + assertEquals(0, mutator.getUnflushedSize()); + assertEquals(0, mutator.getCurrentWriteBufferSize()); + assertFalse(iter0.hasNext()); + assertFalse(iter1.hasNext()); + // ra0 doest handle the mutation so the mutation won't be pushed back to buffer + iter0.remove(); + ra0.close(); + assertEquals(0, mutator.size()); + assertEquals(0, mutator.getUnflushedSize()); + assertEquals(0, mutator.getCurrentWriteBufferSize()); + // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer + ra1.close(); + assertEquals(1, mutator.size()); + assertEquals(1, mutator.getUnflushedSize()); + assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index f242cef5389..4338f6d8e4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -139,7 +139,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader { * Initializes the compression after the shared stuff has been initialized. Called once. */ protected abstract void initAfterCompression() throws IOException; - + /** * Initializes the compression after the shared stuff has been initialized. Called once. * @param cellCodecClsName class name of cell Codec