From f2cc6ce4d5136b86ba999d46785191662e8ac3c2 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 16 May 2018 09:16:59 -0700 Subject: [PATCH] VersionedIntervalTimeline: Optimize construction with heavily populated holders. (#5777) * VersionedIntervalTimeline: Optimize construction with heavily populated holders. Each time a segment is "add"ed to a timeline, "isComplete" is called on the holder that it is added to. "isComplete" is an O(segments per chunk) operation, meaning that adding N segments to a chunk is an O(N^2) operation. This blows up badly if we have thousands of segments per chunk. The patch defers the "isComplete" check until after all segments have been inserted. * Fix imports. --- .../timeline/VersionedIntervalTimeline.java | 89 ++++++++++++++----- .../io/druid/indexer/HadoopIngestionSpec.java | 7 +- .../indexing/common/task/AppendTask.java | 11 +-- .../IndexerSQLMetadataStorageCoordinator.java | 44 ++++----- .../metadata/SQLMetadataSegmentManager.java | 76 ++++++---------- .../DruidCoordinatorCleanupOvershadowed.java | 6 +- 6 files changed, 114 insertions(+), 119 deletions(-) diff --git a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java index 8e558804210..d628e3770da 100644 --- a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java +++ b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java @@ -20,7 +20,9 @@ package io.druid.timeline; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -35,6 +37,7 @@ import org.joda.time.Interval; import java.util.ArrayList; import java.util.Comparator; +import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -85,13 +88,28 @@ public class VersionedIntervalTimeline implements Timel public static VersionedIntervalTimeline forSegments(Iterable segments) { - VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - for (final DataSegment segment : segments) { - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); - } + return forSegments(segments.iterator()); + } + + public static VersionedIntervalTimeline forSegments(Iterator segments) + { + final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + addSegments(timeline, segments); return timeline; } + public static void addSegments( + VersionedIntervalTimeline timeline, + Iterator segments + ) + { + timeline.addAll( + Iterators.transform(segments, segment -> segment.getShardSpec().createChunk(segment)), + DataSegment::getInterval, + DataSegment::getVersion + ); + } + @VisibleForTesting public Map> getAllTimelineEntries() { @@ -100,34 +118,57 @@ public class VersionedIntervalTimeline implements Timel public void add(final Interval interval, VersionType version, PartitionChunk object) { + addAll(Iterators.singletonIterator(object), o -> interval, o -> version); + } + + private void addAll( + final Iterator> objects, + final Function intervalFunction, + final Function versionFunction + ) + { + lock.writeLock().lock(); + try { - lock.writeLock().lock(); + final IdentityHashMap allEntries = new IdentityHashMap<>(); - Map exists = allTimelineEntries.get(interval); - TimelineEntry entry = null; + while (objects.hasNext()) { + PartitionChunk object = objects.next(); + Interval interval = intervalFunction.apply(object.getObject()); + VersionType version = versionFunction.apply(object.getObject()); + Map exists = allTimelineEntries.get(interval); + TimelineEntry entry; - if (exists == null) { - entry = new TimelineEntry(interval, version, new PartitionHolder(object)); - TreeMap versionEntry = new TreeMap(versionComparator); - versionEntry.put(version, entry); - allTimelineEntries.put(interval, versionEntry); - } else { - entry = exists.get(version); - - if (entry == null) { - entry = new TimelineEntry(interval, version, new PartitionHolder(object)); - exists.put(version, entry); + if (exists == null) { + entry = new TimelineEntry(interval, version, new PartitionHolder<>(object)); + TreeMap versionEntry = new TreeMap<>(versionComparator); + versionEntry.put(version, entry); + allTimelineEntries.put(interval, versionEntry); } else { - PartitionHolder partitionHolder = entry.getPartitionHolder(); - partitionHolder.add(object); + entry = exists.get(version); + + if (entry == null) { + entry = new TimelineEntry(interval, version, new PartitionHolder<>(object)); + exists.put(version, entry); + } else { + PartitionHolder partitionHolder = entry.getPartitionHolder(); + partitionHolder.add(object); + } } + + allEntries.put(entry, interval); } - if (entry.getPartitionHolder().isComplete()) { - add(completePartitionsTimeline, interval, entry); - } + // "isComplete" is O(objects in holder) so defer it to the end of addAll. + for (Map.Entry entry : allEntries.entrySet()) { + Interval interval = entry.getValue(); - add(incompletePartitionsTimeline, interval, entry); + if (entry.getKey().getPartitionHolder().isComplete()) { + add(completePartitionsTimeline, interval, entry.getKey()); + } + + add(incompletePartitionsTimeline, interval, entry.getKey()); + } } finally { lock.writeLock().unlock(); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index aea58796b65..1048733f991 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import io.druid.common.utils.UUIDUtils; import io.druid.indexer.hadoop.DatasourceIngestionSpec; import io.druid.indexer.hadoop.WindowedDataSegment; @@ -196,11 +195,7 @@ public class HadoopIngestionSpec extends IngestionSpec timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - for (DataSegment segment : segmentsList) { - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); - } - + final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(segmentsList); final List windowedSegments = Lists.newArrayList(); for (Interval interval : ingestionSpecObj.getIntervals()) { final List> timeLineSegments = timeline.lookup(interval); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java index 62bdb562424..91dec2e2c45 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java @@ -27,8 +27,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.druid.indexing.common.TaskToolbox; import io.druid.java.util.common.Intervals; -import io.druid.java.util.common.guava.Comparators; -import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; @@ -36,6 +34,7 @@ import io.druid.segment.IndexableAdapter; import io.druid.segment.QueryableIndexIndexableAdapter; import io.druid.segment.RowFilteringIndexAdapter; import io.druid.segment.RowPointer; +import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -76,13 +75,7 @@ public class AppendTask extends MergeTaskBase public File merge(final TaskToolbox toolbox, final Map segments, final File outDir) throws Exception { - VersionedIntervalTimeline timeline = new VersionedIntervalTimeline( - Comparators.naturalNullsFirst() - ); - - for (DataSegment segment : segments.keySet()) { - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); - } + VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(segments.keySet()); final Iterable segmentsToMerge = Iterables.concat( Iterables.transform( diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 17bd270f9c7..221a66e45ad 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -26,8 +26,8 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; @@ -208,7 +208,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final Handle handle, final String dataSource, final List intervals - ) throws IOException + ) { if (intervals == null || intervals.isEmpty()) { throw new IAE("null/empty intervals"); @@ -241,29 +241,21 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .bind(2 * i + 2, interval.getStart().toString()); } - final ResultIterator dbSegments = sql - .map(ByteArrayMapper.FIRST) - .iterator(); - - final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( - Ordering.natural() - ); - - while (dbSegments.hasNext()) { - final byte[] payload = dbSegments.next(); - - DataSegment segment = jsonMapper.readValue( - payload, - DataSegment.class + try (final ResultIterator dbSegments = sql.map(ByteArrayMapper.FIRST).iterator()) { + return VersionedIntervalTimeline.forSegments( + Iterators.transform( + dbSegments, + payload -> { + try { + return jsonMapper.readValue(payload, DataSegment.class); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + ) ); - - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); - } - - dbSegments.close(); - - return timeline; } /** @@ -584,7 +576,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor StringUtils.format( "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable(), connector.getQuoteString() + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() ) ) .bind("id", newIdentifier.getIdentifierAsString()) @@ -656,7 +649,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor StringUtils.format( "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) " + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - dbTables.getSegmentsTable(), connector.getQuoteString() + dbTables.getSegmentsTable(), + connector.getQuoteString() ) ) .bind("id", segment.getIdentifier()) diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java index 8d412368b59..429413c430b 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java @@ -27,8 +27,8 @@ import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Interner; import com.google.common.collect.Interners; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -179,55 +179,30 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager try { final IDBI dbi = connector.getDBI(); VersionedIntervalTimeline segmentTimeline = connector.inReadOnlyTransaction( - new TransactionCallback>() - { - @Override - public VersionedIntervalTimeline inTransaction( - Handle handle, TransactionStatus status - ) - { - return handle - .createQuery(StringUtils.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource", - getSegmentsTable() - )) - .setFetchSize(connector.getStreamingFetchSize()) - .bind("dataSource", ds) - .map(ByteArrayMapper.FIRST) - .fold( - new VersionedIntervalTimeline(Ordering.natural()), - new Folder3, byte[]>() - { - @Override - public VersionedIntervalTimeline fold( - VersionedIntervalTimeline timeline, - byte[] payload, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - try { - final DataSegment segment = DATA_SEGMENT_INTERNER.intern(jsonMapper.readValue( - payload, - DataSegment.class - )); + (handle, status) -> VersionedIntervalTimeline.forSegments( + Iterators.transform( + handle + .createQuery( + StringUtils.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource", + getSegmentsTable() + ) + ) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("dataSource", ds) + .map(ByteArrayMapper.FIRST) + .iterator(), + payload -> { + try { + return DATA_SEGMENT_INTERNER.intern(jsonMapper.readValue(payload, DataSegment.class)); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + ) - timeline.add( - segment.getInterval(), - segment.getVersion(), - segment.getShardSpec().createChunk(segment) - ); - - return timeline; - } - catch (Exception e) { - throw new SQLException(e.toString()); - } - } - } - ); - } - } + ) ); final List segments = Lists.newArrayList(); @@ -537,7 +512,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager .createQuery( StringUtils.format( "SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource and start >= :start and %2$send%2$s <= :end and used = false ORDER BY start, %2$send%2$s", - getSegmentsTable(), connector.getQuoteString() + getSegmentsTable(), + connector.getQuoteString() ) ) .setFetchSize(connector.getStreamingFetchSize()) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java index 779c186ea03..da6a22ad33a 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java @@ -65,11 +65,7 @@ public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelp timelines.put(dataSource.getName(), timeline); } - for (DataSegment segment : dataSource.getSegments()) { - timeline.add( - segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment) - ); - } + VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator()); } } }