HBASE-20017 BufferedMutatorImpl submit the same mutation repeatedly

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Chia-Ping Tsai 2018-02-18 20:48:48 +08:00 committed by Andrew Purtell
parent fd8189d31d
commit 21f6830fe1
3 changed files with 85 additions and 35 deletions

View File

@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET; import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.Arrays; import java.util.Arrays;
@ -288,26 +289,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
} }
if (!synchronous) { if (!synchronous) {
QueueRowAccess taker = new QueueRowAccess(); try (QueueRowAccess taker = createQueueRowAccess()){
try {
ap.submit(tableName, taker, true, null, false); ap.submit(tableName, taker, true, null, false);
if (ap.hasError()) { if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -" LOG.debug(tableName + ": One or more of the operations have failed -"
+ " waiting for all operation in progress to finish (successfully or not)"); + " waiting for all operation in progress to finish (successfully or not)");
} }
} finally {
taker.restoreRemainder();
} }
} }
if (synchronous || ap.hasError()) { if (synchronous || ap.hasError()) {
QueueRowAccess taker = new QueueRowAccess(); while (true) {
try { try (QueueRowAccess taker = createQueueRowAccess()){
while (!taker.isEmpty()) { if (taker.isEmpty()) {
ap.submit(tableName, taker, true, null, false); break;
taker.reset(); }
ap.submit(tableName, taker, true, null, false);
} }
} finally {
taker.restoreRemainder();
} }
RetriesExhaustedWithDetailsException error = RetriesExhaustedWithDetailsException error =
@ -444,36 +441,35 @@ public class BufferedMutatorImpl implements BufferedMutator {
return Arrays.asList(writeAsyncBuffer.toArray(new Row[0])); return Arrays.asList(writeAsyncBuffer.toArray(new Row[0]));
} }
private class QueueRowAccess implements RowAccess<Row> { @VisibleForTesting
private int remainder = undealtMutationCount.getAndSet(0); QueueRowAccess createQueueRowAccess() {
return new QueueRowAccess();
void reset() {
restoreRemainder();
remainder = undealtMutationCount.getAndSet(0);
} }
@VisibleForTesting
class QueueRowAccess implements RowAccess<Row>, Closeable {
private int remainder = undealtMutationCount.getAndSet(0);
private Mutation last = null;
@Override @Override
public Iterator<Row> iterator() { public Iterator<Row> iterator() {
return new Iterator<Row>() { return new Iterator<Row>() {
private final Iterator<Mutation> iter = writeAsyncBuffer.iterator();
private int countDown = remainder; private int countDown = remainder;
private Mutation last = null;
@Override @Override
public boolean hasNext() { public boolean hasNext() {
if (countDown <= 0) { return countDown > 0;
return false;
}
return iter.hasNext();
} }
@Override @Override
public Row next() { public Row next() {
restoreLastMutation();
if (!hasNext()) { if (!hasNext()) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
last = iter.next(); last = writeAsyncBuffer.poll();
if (last == null) { if (last == null) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
currentWriteBufferSize.addAndGet(-last.heapSize());
--countDown; --countDown;
return last; return last;
} }
@ -482,28 +478,37 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (last == null) { if (last == null) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
iter.remove();
currentWriteBufferSize.addAndGet(-last.heapSize());
--remainder; --remainder;
last = null;
} }
}; };
} }
private void restoreLastMutation() {
// restore the last mutation since it isn't submitted
if (last != null) {
writeAsyncBuffer.add(last);
currentWriteBufferSize.addAndGet(last.heapSize());
last = null;
}
}
@Override @Override
public int size() { public int size() {
return remainder; return remainder;
} }
void restoreRemainder() { @Override
public boolean isEmpty() {
return remainder <= 0;
}
@Override
public void close() {
restoreLastMutation();
if (remainder > 0) { if (remainder > 0) {
undealtMutationCount.addAndGet(remainder); undealtMutationCount.addAndGet(remainder);
remainder = 0; remainder = 0;
} }
} }
@Override
public boolean isEmpty() {
return remainder <= 0;
}
} }
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.BufferedMutatorImpl.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS; import static org.apache.hadoop.hbase.client.BufferedMutatorImpl.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -837,7 +838,7 @@ public class TestAsyncProcess {
@Override @Override
public void run(){ public void run(){
Threads.sleep(1000); Threads.sleep(1000);
Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent assertFalse(checkPoint.get()); // TODO: this is timing-dependent
ai.decrementAndGet(); ai.decrementAndGet();
ap.tasksInProgress.decrementAndGet(); ap.tasksInProgress.decrementAndGet();
checkPoint2.set(true); checkPoint2.set(true);
@ -849,7 +850,7 @@ public class TestAsyncProcess {
puts.add(p); puts.add(p);
ap.submit(DUMMY_TABLE, puts, false, null, false); ap.submit(DUMMY_TABLE, puts, false, null, false);
Assert.assertFalse(puts.isEmpty()); assertFalse(puts.isEmpty());
t.start(); t.start();
@ -1931,4 +1932,48 @@ public class TestAsyncProcess {
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
} }
@Test
public void testQueueRowAccess() throws Exception {
ClusterConnection conn = createHConnection();
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
mutator.mutate(p0);
BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
// QueueRowAccess should take all undealt mutations
assertEquals(0, mutator.undealtMutationCount.get());
mutator.mutate(p1);
assertEquals(1, mutator.undealtMutationCount.get());
BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
// QueueRowAccess should take all undealt mutations
assertEquals(0, mutator.undealtMutationCount.get());
assertEquals(1, ra0.size());
assertEquals(1, ra1.size());
Iterator<Row> iter0 = ra0.iterator();
Iterator<Row> iter1 = ra1.iterator();
assertTrue(iter0.hasNext());
assertTrue(iter1.hasNext());
// the next() will poll the mutation from inner buffer and update the buffer count
assertTrue(iter0.next() == p0);
assertEquals(1, mutator.writeAsyncBuffer.size());
assertEquals(p1.heapSize(), mutator.currentWriteBufferSize.get());
assertTrue(iter1.next() == p1);
assertEquals(0, mutator.writeAsyncBuffer.size());
assertEquals(0, mutator.currentWriteBufferSize.get());
assertFalse(iter0.hasNext());
assertFalse(iter1.hasNext());
// ra0 doest handle the mutation so the mutation won't be pushed back to buffer
iter0.remove();
ra0.close();
assertEquals(0, mutator.undealtMutationCount.get());
assertEquals(0, mutator.writeAsyncBuffer.size());
assertEquals(0, mutator.currentWriteBufferSize.get());
// ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
ra1.close();
assertEquals(1, mutator.undealtMutationCount.get());
assertEquals(1, mutator.writeAsyncBuffer.size());
assertEquals(p1.heapSize(), mutator.currentWriteBufferSize.get());
}
} }