From ac3d09e7fd0a003d2805456d16de2ba3abbf13d2 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 15 Jun 2019 10:37:06 +0800 Subject: [PATCH] HBASE-22577 BufferedMutatorOverAsyncBufferedMutator.tryCompleteFuture consume too much CPU time --- ...fferedMutatorOverAsyncBufferedMutator.java | 86 +++++++++++-------- .../hbase/client/TestBufferedMutator.java | 3 +- 2 files changed, 52 insertions(+), 37 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java index a7d4595c475..b8bc55c47c3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java @@ -23,16 +23,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}. @@ -40,17 +44,20 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator { + private static final Logger LOG = + LoggerFactory.getLogger(BufferedMutatorOverAsyncBufferedMutator.class); + private final AsyncBufferedMutator mutator; private final ExceptionListener listener; - private List> futures = new ArrayList<>(); + private final Set> futures = ConcurrentHashMap.newKeySet(); + + private final AtomicLong bufferedSize = new AtomicLong(0); private final ConcurrentLinkedQueue> errors = new ConcurrentLinkedQueue<>(); - private final static int BUFFERED_FUTURES_THRESHOLD = 1024; - BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator, ExceptionListener listener) { this.mutator = mutator; @@ -100,62 +107,69 @@ class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator { return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts); } + private void internalFlush() throws RetriesExhaustedWithDetailsException { + // should get the future array before calling mutator.flush, otherwise we may hit an infinite + // wait, since someone may add new future to the map after we calling the flush. + CompletableFuture[] toWait = futures.toArray(new CompletableFuture[0]); + mutator.flush(); + try { + CompletableFuture.allOf(toWait).join(); + } catch (CompletionException e) { + // just ignore, we will record the actual error in the errors field + LOG.debug("Flush failed, you should get an exception thrown to your code", e); + } + if (!errors.isEmpty()) { + RetriesExhaustedWithDetailsException error = makeError(); + listener.onException(error, this); + } + } + @Override public void mutate(List mutations) throws IOException { - List> toBuffered = new ArrayList<>(); List> fs = mutator.mutate(mutations); for (int i = 0, n = fs.size(); i < n; i++) { CompletableFuture toComplete = new CompletableFuture<>(); - final int index = i; - addListener(fs.get(index), (r, e) -> { + futures.add(toComplete); + Mutation mutation = mutations.get(i); + long heapSize = mutation.heapSize(); + bufferedSize.addAndGet(heapSize); + addListener(fs.get(i), (r, e) -> { + futures.remove(toComplete); + bufferedSize.addAndGet(-heapSize); if (e != null) { - errors.add(Pair.newPair(mutations.get(index), e)); + errors.add(Pair.newPair(mutation, e)); toComplete.completeExceptionally(e); } else { toComplete.complete(r); } }); - toBuffered.add(toComplete); } synchronized (this) { - futures.addAll(toBuffered); - if (futures.size() > BUFFERED_FUTURES_THRESHOLD) { - tryCompleteFuture(); - } - if (!errors.isEmpty()) { + if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) { + // We have too many mutations which are not completed yet, let's call a flush to release the + // memory to prevent OOM + // We use buffer size * 2 is because that, the async buffered mutator will flush + // automatically when the write buffer size limit is reached, so usually we do not need to + // call flush explicitly if the buffered size is only a little larger than the buffer size + // limit. But if the buffered size is too large(2 times of the buffer size), we still need + // to block here to prevent OOM. + internalFlush(); + } else if (!errors.isEmpty()) { RetriesExhaustedWithDetailsException error = makeError(); listener.onException(error, this); } } } - private void tryCompleteFuture() { - futures = futures.stream().filter(f -> !f.isDone()).collect(Collectors.toList()); - } - @Override - public void close() throws IOException { - flush(); + public synchronized void close() throws IOException { + internalFlush(); mutator.close(); } @Override - public void flush() throws IOException { - mutator.flush(); - synchronized (this) { - List> toComplete = this.futures; - this.futures = new ArrayList<>(); - try { - CompletableFuture.allOf(toComplete.toArray(new CompletableFuture[toComplete.size()])) - .join(); - } catch (CompletionException e) { - // just ignore, we will record the actual error in the errors field - } - if (!errors.isEmpty()) { - RetriesExhaustedWithDetailsException error = makeError(); - listener.onException(error, this); - } - } + public synchronized void flush() throws IOException { + internalFlush(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java index 23e69ee82b1..3c660d90509 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java @@ -66,7 +66,8 @@ public class TestBufferedMutator { @Test public void test() throws Exception { - try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(TABLE_NAME)) { + try (BufferedMutator mutator = TEST_UTIL.getConnection() + .getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(64 * 1024))) { mutator.mutate(IntStream.range(0, COUNT / 2) .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) .collect(Collectors.toList()));