HBASE-20017 BufferedMutatorImpl submit the same mutation repeatedly

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Chia-Ping Tsai 2018-02-18 21:45:04 +08:00 committed by Andrew Purtell
parent c5ca3c2fe6
commit bc1ac49de2
3 changed files with 81 additions and 12 deletions

View File

@ -295,7 +295,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
break;
}
AsyncRequestFuture asf;
try (QueueRowAccess access = new QueueRowAccess()) {
try (QueueRowAccess access = createQueueRowAccess()) {
if (access.isEmpty()) {
// It means someone has gotten the ticker to run the flush.
break;
@ -406,16 +406,46 @@ public class BufferedMutatorImpl implements BufferedMutator {
return currentWriteBufferSize.get();
}
/**
* Count the mutations which haven't been processed.
* @return count of undealt mutation
*/
@VisibleForTesting
int size() {
return undealtMutationCount.get();
}
private class QueueRowAccess implements RowAccess<Row>, Closeable {
/**
* Count the mutations which haven't been flushed
* @return count of unflushed mutation
*/
@VisibleForTesting
int getUnflushedSize() {
return writeAsyncBuffer.size();
}
@VisibleForTesting
QueueRowAccess createQueueRowAccess() {
return new QueueRowAccess();
}
@VisibleForTesting
class QueueRowAccess implements RowAccess<Row>, Closeable {
private int remainder = undealtMutationCount.getAndSet(0);
private Mutation last = null;
private void restoreLastMutation() {
// restore the last mutation since it isn't submitted
if (last != null) {
writeAsyncBuffer.add(last);
currentWriteBufferSize.addAndGet(last.heapSize());
last = null;
}
}
@Override
public void close() {
restoreLastMutation();
if (remainder > 0) {
undealtMutationCount.addAndGet(remainder);
remainder = 0;
@ -425,25 +455,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
@Override
public Iterator<Row> iterator() {
return new Iterator<Row>() {
private final Iterator<Mutation> iter = writeAsyncBuffer.iterator();
private int countDown = remainder;
private Mutation last = null;
@Override
public boolean hasNext() {
if (countDown <= 0) {
return false;
}
return iter.hasNext();
return countDown > 0;
}
@Override
public Row next() {
restoreLastMutation();
if (!hasNext()) {
throw new NoSuchElementException();
}
last = iter.next();
last = writeAsyncBuffer.poll();
if (last == null) {
throw new NoSuchElementException();
}
currentWriteBufferSize.addAndGet(-last.heapSize());
--countDown;
return last;
}
@ -452,8 +479,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (last == null) {
throw new IllegalStateException();
}
iter.remove();
currentWriteBufferSize.addAndGet(-last.heapSize());
--remainder;
last = null;
}

View File

@ -1795,4 +1795,48 @@ public class TestAsyncProcess {
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
}
@Test
public void testQueueRowAccess() throws Exception {
ClusterConnection conn = createHConnection();
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
mutator.mutate(p0);
BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
// QueueRowAccess should take all undealt mutations
assertEquals(0, mutator.size());
mutator.mutate(p1);
assertEquals(1, mutator.size());
BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
// QueueRowAccess should take all undealt mutations
assertEquals(0, mutator.size());
assertEquals(1, ra0.size());
assertEquals(1, ra1.size());
Iterator<Row> iter0 = ra0.iterator();
Iterator<Row> iter1 = ra1.iterator();
assertTrue(iter0.hasNext());
assertTrue(iter1.hasNext());
// the next() will poll the mutation from inner buffer and update the buffer count
assertTrue(iter0.next() == p0);
assertEquals(1, mutator.getUnflushedSize());
assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
assertTrue(iter1.next() == p1);
assertEquals(0, mutator.getUnflushedSize());
assertEquals(0, mutator.getCurrentWriteBufferSize());
assertFalse(iter0.hasNext());
assertFalse(iter1.hasNext());
// ra0 doest handle the mutation so the mutation won't be pushed back to buffer
iter0.remove();
ra0.close();
assertEquals(0, mutator.size());
assertEquals(0, mutator.getUnflushedSize());
assertEquals(0, mutator.getCurrentWriteBufferSize());
// ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
ra1.close();
assertEquals(1, mutator.size());
assertEquals(1, mutator.getUnflushedSize());
assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
}
}

View File

@ -139,7 +139,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
* Initializes the compression after the shared stuff has been initialized. Called once.
*/
protected abstract void initAfterCompression() throws IOException;
/**
* Initializes the compression after the shared stuff has been initialized. Called once.
* @param cellCodecClsName class name of cell Codec