make find sinks to persist run in one callable together with the actual persist work

This commit is contained in:
qiumingming.2018 2019-01-03 21:00:23 +08:00
parent c9a6de7fc5
commit a24a2d80ae
1 changed files with 31 additions and 36 deletions

View File

@ -90,7 +90,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -482,38 +481,6 @@ public class AppenderatorImpl implements Appenderator
{
throwPersistErrorIfExists();
final Map<String, Integer> currentHydrants = new HashMap<>();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = new ArrayList<>();
int numPersistedRows = 0;
long bytesPersisted = 0L;
Iterator<Map.Entry<SegmentIdentifier, Sink>> iterator = sinks.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<SegmentIdentifier, Sink> entry = iterator.next();
final SegmentIdentifier identifier = entry.getKey();
final Sink sink = entry.getValue();
if (sink == null) {
throw new ISE("No sink for identifier: %s", identifier);
}
final List<FireHydrant> hydrants = Lists.newArrayList(sink);
currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size());
numPersistedRows += sink.getNumRowsInMemory();
bytesPersisted += sink.getBytesInMemory();
final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
for (FireHydrant hydrant : hydrants.subList(0, limit)) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier);
indexesToPersist.add(Pair.of(hydrant, identifier));
}
}
if (sink.swappable()) {
indexesToPersist.add(Pair.of(sink.swap(), identifier));
}
}
log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource());
final String threadName = StringUtils.format("%s-incremental-persist", schema.getDataSource());
@ -526,6 +493,37 @@ public class AppenderatorImpl implements Appenderator
@Override
public Object doCall() throws IOException
{
final Map<String, Integer> currentHydrants = new HashMap<>();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = new ArrayList<>();
int numPersistedRows = 0;
long bytesPersisted = 0L;
for (SegmentIdentifier identifier : sinks.keySet()) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
throw new ISE("No sink for identifier: %s", identifier);
}
final List<FireHydrant> hydrants = Lists.newArrayList(sink);
currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size());
numPersistedRows += sink.getNumRowsInMemory();
bytesPersisted += sink.getBytesInMemory();
final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
for (FireHydrant hydrant : hydrants.subList(0, limit)) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier);
indexesToPersist.add(Pair.of(hydrant, identifier));
}
}
if (sink.swappable()) {
indexesToPersist.add(Pair.of(sink.swap(), identifier));
}
}
// NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes.
rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
try {
for (Pair<FireHydrant, SegmentIdentifier> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs));
@ -587,9 +585,6 @@ public class AppenderatorImpl implements Appenderator
runExecStopwatch.stop();
resetNextFlush();
// NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes.
rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
return future;
}