diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 638955e3e8b..e33bd7ce369 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET; import com.google.common.annotations.VisibleForTesting; +import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.util.Arrays; @@ -61,7 +62,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; public class BufferedMutatorImpl implements BufferedMutator { private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class); - + private final ExceptionListener listener; protected ClusterConnection connection; // non-final so can be overridden in test @@ -288,26 +289,22 @@ public class BufferedMutatorImpl implements BufferedMutator { } if (!synchronous) { - QueueRowAccess taker = new QueueRowAccess(); - try { + try (QueueRowAccess taker = createQueueRowAccess()){ ap.submit(tableName, taker, true, null, false); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); } - } finally { - taker.restoreRemainder(); } } if (synchronous || ap.hasError()) { - QueueRowAccess taker = new QueueRowAccess(); - try { - while (!taker.isEmpty()) { + while (true) { + try (QueueRowAccess taker = createQueueRowAccess()){ + if (taker.isEmpty()) { + break; + } ap.submit(tableName, taker, true, null, false); - taker.reset(); } - } finally { - taker.restoreRemainder(); } RetriesExhaustedWithDetailsException error = @@ -444,36 +441,35 @@ public class BufferedMutatorImpl implements BufferedMutator { return Arrays.asList(writeAsyncBuffer.toArray(new Row[0])); } - private class QueueRowAccess implements RowAccess { - private int remainder = undealtMutationCount.getAndSet(0); + @VisibleForTesting + QueueRowAccess createQueueRowAccess() { + return new QueueRowAccess(); + } - void reset() { - restoreRemainder(); - remainder = undealtMutationCount.getAndSet(0); - } + @VisibleForTesting + class QueueRowAccess implements RowAccess, Closeable { + private int remainder = undealtMutationCount.getAndSet(0); + private Mutation last = null; @Override public Iterator iterator() { return new Iterator() { - private final Iterator iter = writeAsyncBuffer.iterator(); private int countDown = remainder; - private Mutation last = null; @Override public boolean hasNext() { - if (countDown <= 0) { - return false; - } - return iter.hasNext(); + return countDown > 0; } @Override public Row next() { + restoreLastMutation(); if (!hasNext()) { throw new NoSuchElementException(); } - last = iter.next(); + last = writeAsyncBuffer.poll(); if (last == null) { throw new NoSuchElementException(); } + currentWriteBufferSize.addAndGet(-last.heapSize()); --countDown; return last; } @@ -482,28 +478,37 @@ public class BufferedMutatorImpl implements BufferedMutator { if (last == null) { throw new IllegalStateException(); } - iter.remove(); - currentWriteBufferSize.addAndGet(-last.heapSize()); --remainder; + last = null; } }; } + private void restoreLastMutation() { + // restore the last mutation since it isn't submitted + if (last != null) { + writeAsyncBuffer.add(last); + currentWriteBufferSize.addAndGet(last.heapSize()); + last = null; + } + } + @Override public int size() { return remainder; } - void restoreRemainder() { + @Override + public boolean isEmpty() { + return remainder <= 0; + } + @Override + public void close() { + restoreLastMutation(); if (remainder > 0) { undealtMutationCount.addAndGet(remainder); remainder = 0; } } - - @Override - public boolean isEmpty() { - return remainder <= 0; - } } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 52960b47822..5d37ad7be12 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.BufferedMutatorImpl.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -837,7 +838,7 @@ public class TestAsyncProcess { @Override public void run(){ Threads.sleep(1000); - Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent + assertFalse(checkPoint.get()); // TODO: this is timing-dependent ai.decrementAndGet(); ap.tasksInProgress.decrementAndGet(); checkPoint2.set(true); @@ -849,7 +850,7 @@ public class TestAsyncProcess { puts.add(p); ap.submit(DUMMY_TABLE, puts, false, null, false); - Assert.assertFalse(puts.isEmpty()); + assertFalse(puts.isEmpty()); t.start(); @@ -1931,4 +1932,48 @@ public class TestAsyncProcess { LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); } + + @Test + public void testQueueRowAccess() throws Exception { + ClusterConnection conn = createHConnection(); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, + new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000)); + Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); + Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); + mutator.mutate(p0); + BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess(); + // QueueRowAccess should take all undealt mutations + assertEquals(0, mutator.undealtMutationCount.get()); + mutator.mutate(p1); + assertEquals(1, mutator.undealtMutationCount.get()); + BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess(); + // QueueRowAccess should take all undealt mutations + assertEquals(0, mutator.undealtMutationCount.get()); + assertEquals(1, ra0.size()); + assertEquals(1, ra1.size()); + Iterator iter0 = ra0.iterator(); + Iterator iter1 = ra1.iterator(); + assertTrue(iter0.hasNext()); + assertTrue(iter1.hasNext()); + // the next() will poll the mutation from inner buffer and update the buffer count + assertTrue(iter0.next() == p0); + assertEquals(1, mutator.writeAsyncBuffer.size()); + assertEquals(p1.heapSize(), mutator.currentWriteBufferSize.get()); + assertTrue(iter1.next() == p1); + assertEquals(0, mutator.writeAsyncBuffer.size()); + assertEquals(0, mutator.currentWriteBufferSize.get()); + assertFalse(iter0.hasNext()); + assertFalse(iter1.hasNext()); + // ra0 doest handle the mutation so the mutation won't be pushed back to buffer + iter0.remove(); + ra0.close(); + assertEquals(0, mutator.undealtMutationCount.get()); + assertEquals(0, mutator.writeAsyncBuffer.size()); + assertEquals(0, mutator.currentWriteBufferSize.get()); + // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer + ra1.close(); + assertEquals(1, mutator.undealtMutationCount.get()); + assertEquals(1, mutator.writeAsyncBuffer.size()); + assertEquals(p1.heapSize(), mutator.currentWriteBufferSize.get()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 632f146f2a6..0b2d12f2402 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -847,7 +847,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // these scanners are properly closed() whether or not the scan is completed successfully // Eagerly creating scanners so that we have the ref counting ticking on the newly created // store files. In case of stream scanners this eager creation does not induce performance - // penalty because in scans (that uses stream scanners) the next() call is bound to happen. + // penalty because in scans (that uses stream scanners) the next() call is bound to happen. List scanners = store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); flushedstoreFileScanners.addAll(scanners);