VersionedIntervalTimeline: Optimize construction with heavily populated holders. (#5777)

* VersionedIntervalTimeline: Optimize construction with heavily populated holders.

Each time a segment is "add"ed to a timeline, "isComplete" is called on the holder
that it is added to. "isComplete" is an O(segments per chunk) operation, meaning
that adding N segments to a chunk is an O(N^2) operation. This blows up badly if
we have thousands of segments per chunk.

The patch defers the "isComplete" check until after all segments have been
inserted.

* Fix imports.
This commit is contained in:
Gian Merlino 2018-05-16 09:16:59 -07:00 committed by GitHub
parent d8effff30b
commit f2cc6ce4d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 114 additions and 119 deletions

View File

@ -20,7 +20,9 @@
package io.druid.timeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@ -35,6 +37,7 @@ import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -85,13 +88,28 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterable<DataSegment> segments)
{
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
for (final DataSegment segment : segments) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
return forSegments(segments.iterator());
}
public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterator<DataSegment> segments)
{
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
addSegments(timeline, segments);
return timeline;
}
public static void addSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
Iterator<DataSegment> segments
)
{
timeline.addAll(
Iterators.transform(segments, segment -> segment.getShardSpec().createChunk(segment)),
DataSegment::getInterval,
DataSegment::getVersion
);
}
@VisibleForTesting
public Map<Interval, TreeMap<VersionType, TimelineEntry>> getAllTimelineEntries()
{
@ -100,34 +118,57 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
addAll(Iterators.singletonIterator(object), o -> interval, o -> version);
}
private void addAll(
final Iterator<PartitionChunk<ObjectType>> objects,
final Function<ObjectType, Interval> intervalFunction,
final Function<ObjectType, VersionType> versionFunction
)
{
lock.writeLock().lock();
try {
lock.writeLock().lock();
final IdentityHashMap<TimelineEntry, Interval> allEntries = new IdentityHashMap<>();
Map<VersionType, TimelineEntry> exists = allTimelineEntries.get(interval);
TimelineEntry entry = null;
while (objects.hasNext()) {
PartitionChunk<ObjectType> object = objects.next();
Interval interval = intervalFunction.apply(object.getObject());
VersionType version = versionFunction.apply(object.getObject());
Map<VersionType, TimelineEntry> exists = allTimelineEntries.get(interval);
TimelineEntry entry;
if (exists == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<ObjectType>(object));
TreeMap<VersionType, TimelineEntry> versionEntry = new TreeMap<VersionType, TimelineEntry>(versionComparator);
versionEntry.put(version, entry);
allTimelineEntries.put(interval, versionEntry);
} else {
entry = exists.get(version);
if (entry == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<ObjectType>(object));
exists.put(version, entry);
if (exists == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<>(object));
TreeMap<VersionType, TimelineEntry> versionEntry = new TreeMap<>(versionComparator);
versionEntry.put(version, entry);
allTimelineEntries.put(interval, versionEntry);
} else {
PartitionHolder<ObjectType> partitionHolder = entry.getPartitionHolder();
partitionHolder.add(object);
entry = exists.get(version);
if (entry == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<>(object));
exists.put(version, entry);
} else {
PartitionHolder<ObjectType> partitionHolder = entry.getPartitionHolder();
partitionHolder.add(object);
}
}
allEntries.put(entry, interval);
}
if (entry.getPartitionHolder().isComplete()) {
add(completePartitionsTimeline, interval, entry);
}
// "isComplete" is O(objects in holder) so defer it to the end of addAll.
for (Map.Entry<TimelineEntry, Interval> entry : allEntries.entrySet()) {
Interval interval = entry.getValue();
add(incompletePartitionsTimeline, interval, entry);
if (entry.getKey().getPartitionHolder().isComplete()) {
add(completePartitionsTimeline, interval, entry.getKey());
}
add(incompletePartitionsTimeline, interval, entry.getKey());
}
}
finally {
lock.writeLock().unlock();

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.common.utils.UUIDUtils;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.WindowedDataSegment;
@ -196,11 +195,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
}
}
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
for (DataSegment segment : segmentsList) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(segmentsList);
final List<WindowedDataSegment> windowedSegments = Lists.newArrayList();
for (Interval interval : ingestionSpecObj.getIntervals()) {
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(interval);

View File

@ -27,8 +27,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.indexing.common.TaskToolbox;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.guava.Comparators;
import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
@ -36,6 +34,7 @@ import io.druid.segment.IndexableAdapter;
import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.RowFilteringIndexAdapter;
import io.druid.segment.RowPointer;
import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -76,13 +75,7 @@ public class AppendTask extends MergeTaskBase
public File merge(final TaskToolbox toolbox, final Map<DataSegment, File> segments, final File outDir)
throws Exception
{
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<String, DataSegment>(
Comparators.naturalNullsFirst()
);
for (DataSegment segment : segments.keySet()) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(segments.keySet());
final Iterable<SegmentToMergeHolder> segmentsToMerge = Iterables.concat(
Iterables.transform(

View File

@ -26,8 +26,8 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
@ -208,7 +208,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final Handle handle,
final String dataSource,
final List<Interval> intervals
) throws IOException
)
{
if (intervals == null || intervals.isEmpty()) {
throw new IAE("null/empty intervals");
@ -241,29 +241,21 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind(2 * i + 2, interval.getStart().toString());
}
final ResultIterator<byte[]> dbSegments = sql
.map(ByteArrayMapper.FIRST)
.iterator();
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
Ordering.natural()
);
while (dbSegments.hasNext()) {
final byte[] payload = dbSegments.next();
DataSegment segment = jsonMapper.readValue(
payload,
DataSegment.class
try (final ResultIterator<byte[]> dbSegments = sql.map(ByteArrayMapper.FIRST).iterator()) {
return VersionedIntervalTimeline.forSegments(
Iterators.transform(
dbSegments,
payload -> {
try {
return jsonMapper.readValue(payload, DataSegment.class);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
)
);
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
dbSegments.close();
return timeline;
}
/**
@ -584,7 +576,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)",
dbTables.getPendingSegmentsTable(), connector.getQuoteString()
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
)
.bind("id", newIdentifier.getIdentifierAsString())
@ -656,7 +649,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable(), connector.getQuoteString()
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
)
.bind("id", segment.getIdentifier())

View File

@ -27,8 +27,8 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -179,55 +179,30 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
try {
final IDBI dbi = connector.getDBI();
VersionedIntervalTimeline<String, DataSegment> segmentTimeline = connector.inReadOnlyTransaction(
new TransactionCallback<VersionedIntervalTimeline<String, DataSegment>>()
{
@Override
public VersionedIntervalTimeline<String, DataSegment> inTransaction(
Handle handle, TransactionStatus status
)
{
return handle
.createQuery(StringUtils.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource",
getSegmentsTable()
))
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", ds)
.map(ByteArrayMapper.FIRST)
.fold(
new VersionedIntervalTimeline<String, DataSegment>(Ordering.natural()),
new Folder3<VersionedIntervalTimeline<String, DataSegment>, byte[]>()
{
@Override
public VersionedIntervalTimeline<String, DataSegment> fold(
VersionedIntervalTimeline<String, DataSegment> timeline,
byte[] payload,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
final DataSegment segment = DATA_SEGMENT_INTERNER.intern(jsonMapper.readValue(
payload,
DataSegment.class
));
(handle, status) -> VersionedIntervalTimeline.forSegments(
Iterators.transform(
handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource",
getSegmentsTable()
)
)
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", ds)
.map(ByteArrayMapper.FIRST)
.iterator(),
payload -> {
try {
return DATA_SEGMENT_INTERNER.intern(jsonMapper.readValue(payload, DataSegment.class));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
)
timeline.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
);
return timeline;
}
catch (Exception e) {
throw new SQLException(e.toString());
}
}
}
);
}
}
)
);
final List<DataSegment> segments = Lists.newArrayList();
@ -537,7 +512,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
.createQuery(
StringUtils.format(
"SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource and start >= :start and %2$send%2$s <= :end and used = false ORDER BY start, %2$send%2$s",
getSegmentsTable(), connector.getQuoteString()
getSegmentsTable(),
connector.getQuoteString()
)
)
.setFetchSize(connector.getStreamingFetchSize())

View File

@ -65,11 +65,7 @@ public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelp
timelines.put(dataSource.getName(), timeline);
}
for (DataSegment segment : dataSource.getSegments()) {
timeline.add(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)
);
}
VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator());
}
}
}