From a24a2d80aeaf8f047d676e7260900fe916f36b78 Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Thu, 3 Jan 2019 21:00:23 +0800 Subject: [PATCH] make find sinks to persist run in one callable together with the actual persist work --- .../appenderator/AppenderatorImpl.java | 67 +++++++++---------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 22c1ccc90d9..329e9d9b3e6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -90,7 +90,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -482,38 +481,6 @@ public class AppenderatorImpl implements Appenderator { throwPersistErrorIfExists(); - final Map currentHydrants = new HashMap<>(); - final List> indexesToPersist = new ArrayList<>(); - int numPersistedRows = 0; - long bytesPersisted = 0L; - Iterator> iterator = sinks.entrySet().iterator(); - - while (iterator.hasNext()) { - final Map.Entry entry = iterator.next(); - final SegmentIdentifier identifier = entry.getKey(); - final Sink sink = entry.getValue(); - if (sink == null) { - throw new ISE("No sink for identifier: %s", identifier); - } - final List hydrants = Lists.newArrayList(sink); - currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size()); - numPersistedRows += sink.getNumRowsInMemory(); - bytesPersisted += sink.getBytesInMemory(); - - final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); - - for (FireHydrant hydrant : hydrants.subList(0, limit)) { - if (!hydrant.hasSwapped()) { - log.info("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier); - indexesToPersist.add(Pair.of(hydrant, identifier)); - } - } - - if (sink.swappable()) { - indexesToPersist.add(Pair.of(sink.swap(), identifier)); - } - } - log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); final String threadName = StringUtils.format("%s-incremental-persist", schema.getDataSource()); @@ -526,6 +493,37 @@ public class AppenderatorImpl implements Appenderator @Override public Object doCall() throws IOException { + final Map currentHydrants = new HashMap<>(); + final List> indexesToPersist = new ArrayList<>(); + int numPersistedRows = 0; + long bytesPersisted = 0L; + for (SegmentIdentifier identifier : sinks.keySet()) { + final Sink sink = sinks.get(identifier); + if (sink == null) { + throw new ISE("No sink for identifier: %s", identifier); + } + final List hydrants = Lists.newArrayList(sink); + currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size()); + numPersistedRows += sink.getNumRowsInMemory(); + bytesPersisted += sink.getBytesInMemory(); + + final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); + + for (FireHydrant hydrant : hydrants.subList(0, limit)) { + if (!hydrant.hasSwapped()) { + log.info("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier); + indexesToPersist.add(Pair.of(hydrant, identifier)); + } + } + + if (sink.swappable()) { + indexesToPersist.add(Pair.of(sink.swap(), identifier)); + } + } + // NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes. + rowsCurrentlyInMemory.addAndGet(-numPersistedRows); + bytesCurrentlyInMemory.addAndGet(-bytesPersisted); + try { for (Pair pair : indexesToPersist) { metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs)); @@ -587,9 +585,6 @@ public class AppenderatorImpl implements Appenderator runExecStopwatch.stop(); resetNextFlush(); - // NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes. - rowsCurrentlyInMemory.addAndGet(-numPersistedRows); - bytesCurrentlyInMemory.addAndGet(-bytesPersisted); return future; }