HBASE-20017 BufferedMutatorImpl submit the same mutation repeatedly
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
0068b95c85
commit
79d9403a79
|
@ -295,7 +295,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
break;
|
||||
}
|
||||
AsyncRequestFuture asf;
|
||||
try (QueueRowAccess access = new QueueRowAccess()) {
|
||||
try (QueueRowAccess access = createQueueRowAccess()) {
|
||||
if (access.isEmpty()) {
|
||||
// It means someone has gotten the ticker to run the flush.
|
||||
break;
|
||||
|
@ -406,16 +406,46 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
return currentWriteBufferSize.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Count the mutations which haven't been processed.
|
||||
* @return count of undealt mutation
|
||||
*/
|
||||
@VisibleForTesting
|
||||
int size() {
|
||||
return undealtMutationCount.get();
|
||||
}
|
||||
|
||||
private class QueueRowAccess implements RowAccess<Row>, Closeable {
|
||||
/**
|
||||
* Count the mutations which haven't been flushed
|
||||
* @return count of unflushed mutation
|
||||
*/
|
||||
@VisibleForTesting
|
||||
int getUnflushedSize() {
|
||||
return writeAsyncBuffer.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
QueueRowAccess createQueueRowAccess() {
|
||||
return new QueueRowAccess();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
class QueueRowAccess implements RowAccess<Row>, Closeable {
|
||||
private int remainder = undealtMutationCount.getAndSet(0);
|
||||
private Mutation last = null;
|
||||
|
||||
private void restoreLastMutation() {
|
||||
// restore the last mutation since it isn't submitted
|
||||
if (last != null) {
|
||||
writeAsyncBuffer.add(last);
|
||||
currentWriteBufferSize.addAndGet(last.heapSize());
|
||||
last = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
restoreLastMutation();
|
||||
if (remainder > 0) {
|
||||
undealtMutationCount.addAndGet(remainder);
|
||||
remainder = 0;
|
||||
|
@ -425,25 +455,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
@Override
|
||||
public Iterator<Row> iterator() {
|
||||
return new Iterator<Row>() {
|
||||
private final Iterator<Mutation> iter = writeAsyncBuffer.iterator();
|
||||
private int countDown = remainder;
|
||||
private Mutation last = null;
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (countDown <= 0) {
|
||||
return false;
|
||||
}
|
||||
return iter.hasNext();
|
||||
return countDown > 0;
|
||||
}
|
||||
@Override
|
||||
public Row next() {
|
||||
restoreLastMutation();
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
last = iter.next();
|
||||
last = writeAsyncBuffer.poll();
|
||||
if (last == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
currentWriteBufferSize.addAndGet(-last.heapSize());
|
||||
--countDown;
|
||||
return last;
|
||||
}
|
||||
|
@ -452,8 +479,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
if (last == null) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
iter.remove();
|
||||
currentWriteBufferSize.addAndGet(-last.heapSize());
|
||||
--remainder;
|
||||
last = null;
|
||||
}
|
||||
|
|
|
@ -1795,4 +1795,48 @@ public class TestAsyncProcess {
|
|||
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
|
||||
Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueRowAccess() throws Exception {
|
||||
ClusterConnection conn = createHConnection();
|
||||
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
|
||||
new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
|
||||
Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
|
||||
Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
|
||||
mutator.mutate(p0);
|
||||
BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
|
||||
// QueueRowAccess should take all undealt mutations
|
||||
assertEquals(0, mutator.size());
|
||||
mutator.mutate(p1);
|
||||
assertEquals(1, mutator.size());
|
||||
BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
|
||||
// QueueRowAccess should take all undealt mutations
|
||||
assertEquals(0, mutator.size());
|
||||
assertEquals(1, ra0.size());
|
||||
assertEquals(1, ra1.size());
|
||||
Iterator<Row> iter0 = ra0.iterator();
|
||||
Iterator<Row> iter1 = ra1.iterator();
|
||||
assertTrue(iter0.hasNext());
|
||||
assertTrue(iter1.hasNext());
|
||||
// the next() will poll the mutation from inner buffer and update the buffer count
|
||||
assertTrue(iter0.next() == p0);
|
||||
assertEquals(1, mutator.getUnflushedSize());
|
||||
assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
|
||||
assertTrue(iter1.next() == p1);
|
||||
assertEquals(0, mutator.getUnflushedSize());
|
||||
assertEquals(0, mutator.getCurrentWriteBufferSize());
|
||||
assertFalse(iter0.hasNext());
|
||||
assertFalse(iter1.hasNext());
|
||||
// ra0 doest handle the mutation so the mutation won't be pushed back to buffer
|
||||
iter0.remove();
|
||||
ra0.close();
|
||||
assertEquals(0, mutator.size());
|
||||
assertEquals(0, mutator.getUnflushedSize());
|
||||
assertEquals(0, mutator.getCurrentWriteBufferSize());
|
||||
// ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
|
||||
ra1.close();
|
||||
assertEquals(1, mutator.size());
|
||||
assertEquals(1, mutator.getUnflushedSize());
|
||||
assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -139,7 +139,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
|
|||
* Initializes the compression after the shared stuff has been initialized. Called once.
|
||||
*/
|
||||
protected abstract void initAfterCompression() throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Initializes the compression after the shared stuff has been initialized. Called once.
|
||||
* @param cellCodecClsName class name of cell Codec
|
||||
|
|
Loading…
Reference in New Issue