add support for getting used segments for multiple interval in IndexerMetadataStorageCoordinator

This commit is contained in:
Himanshu Gupta 2015-11-18 13:00:41 -06:00
parent 2ae34aea3a
commit 221fb95d07
5 changed files with 157 additions and 21 deletions

View File

@ -161,6 +161,12 @@ public class IngestSegmentFirehoseFactoryTest
return ImmutableList.copyOf(segmentSet);
}
@Override
public List<DataSegment> getUsedSegmentsForIntervals(String dataSource, List<Interval> interval) throws IOException
{
return ImmutableList.copyOf(segmentSet);
}
@Override
public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
{

View File

@ -49,6 +49,14 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
return ImmutableList.of();
}
@Override
public List<DataSegment> getUsedSegmentsForIntervals(
String dataSource, List<Interval> intervals
) throws IOException
{
return ImmutableList.of();
}
@Override
public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
{

View File

@ -42,6 +42,19 @@ public interface IndexerMetadataStorageCoordinator
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException;
/**
* Get all segments which may include any data in the interval and are flagged as used.
*
* @param dataSource The datasource to query
* @param intervals The intervals for which all applicable and used datasources are requested.
*
* @return The DataSegments which include data in the requested intervals. These segments may contain data outside the requested interval.
*
* @throws IOException
*/
public List<DataSegment> getUsedSegmentsForIntervals(final String dataSource, final List<Interval> intervals)
throws IOException;
/**
* Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments
* with identifiers already in the metadata storage will not be added).

View File

@ -21,12 +21,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.logger.Logger;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -43,6 +45,7 @@ import org.joda.time.Interval;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
@ -53,7 +56,9 @@ import org.skife.jdbi.v2.util.StringMapper;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -85,10 +90,19 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
connector.createSegmentTable();
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(
final String dataSource,
final Interval interval
) throws IOException
{
return getUsedSegmentsForIntervals(dataSource, ImmutableList.of(interval));
}
@Override
public List<DataSegment> getUsedSegmentsForIntervals(
final String dataSource, final List<Interval> intervals
) throws IOException
{
return connector.retryWithHandle(
new HandleCallback<List<DataSegment>>()
@ -96,16 +110,28 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@Override
public List<DataSegment> withHandle(Handle handle) throws Exception
{
final VersionedIntervalTimeline<String, DataSegment> timeline = getTimelineForIntervalWithHandle(
final VersionedIntervalTimeline<String, DataSegment> timeline = getTimelineForIntervalsWithHandle(
handle,
dataSource,
interval
intervals
);
return Lists.newArrayList(
Set<DataSegment> segments = Sets.newHashSet(
Iterables.concat(
Iterables.transform(
timeline.lookup(interval),
Iterables.concat(
Iterables.transform(
intervals,
new Function<Interval, Iterable<TimelineObjectHolder<String, DataSegment>>>()
{
@Override
public Iterable<TimelineObjectHolder<String, DataSegment>> apply(Interval interval)
{
return timeline.lookup(interval);
}
}
)
),
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<DataSegment>>()
{
@Override
@ -118,6 +144,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
)
);
return new ArrayList<>(segments);
}
}
);
@ -158,29 +185,51 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return identifiers;
}
private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalWithHandle(
private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalsWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
final List<Interval> intervals
) throws IOException
{
if (intervals == null || intervals.isEmpty()) {
throw new IAE("null/empty intervals");
}
final StringBuilder sb = new StringBuilder();
sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ? AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
"(start <= ? AND \"end\" >= ?)"
);
if (i == intervals.size() - 1) {
sb.append(")");
} else {
sb.append(" OR ");
}
}
Query<Map<String, Object>> sql = handle.createQuery(
String.format(
sb.toString(),
dbTables.getSegmentsTable()
)
).bind(0, dataSource);
for (int i = 0; i < intervals.size(); i++) {
Interval interval = intervals.get(i);
sql = sql
.bind(2 * i + 1, interval.getEnd().toString())
.bind(2 * i + 2, interval.getStart().toString());
}
final ResultIterator<byte[]> dbSegments = sql
.map(ByteArrayMapper.FIRST)
.iterator();
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
Ordering.natural()
);
final ResultIterator<byte[]> dbSegments =
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource AND start <= :end and \"end\" >= :start AND used = true",
dbTables.getSegmentsTable()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST)
.iterator();
while (dbSegments.hasNext()) {
final byte[] payload = dbSegments.next();
@ -301,10 +350,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final SegmentIdentifier newIdentifier;
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalWithHandle(
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
handle,
dataSource,
interval
ImmutableList.of(interval)
).lookup(interval);
if (existingChunks.size() > 1) {

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@ -65,6 +66,19 @@ public class IndexerSQLMetadataStorageCoordinatorTest
9,
100
);
private final DataSegment defaultSegment3 = new DataSegment(
"dataSource",
Interval.parse("2015-01-03T00Z/2015-01-04T00Z"),
"version",
ImmutableMap.<String, Object>of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new NoneShardSpec(),
9,
100
);
private final Set<DataSegment> segments = ImmutableSet.of(defaultSegment, defaultSegment2);
IndexerSQLMetadataStorageCoordinator coordinator;
private TestDerbyConnector derbyConnector;
@ -136,6 +150,52 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
@Test
public void testMultiIntervalUsedList() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(ImmutableSet.of(defaultSegment3));
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(),
ImmutableList.of(defaultSegment.getInterval())
)
)
);
Assert.assertEquals(
ImmutableSet.of(defaultSegment3),
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(),
ImmutableList.of(defaultSegment3.getInterval())
)
)
);
Assert.assertEquals(
ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment3),
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(),
ImmutableList.of(defaultSegment.getInterval(), defaultSegment3.getInterval())
)
)
);
//case to check no duplication if two intervals overlapped with the interval of same segment.
Assert.assertEquals(
ImmutableList.of(defaultSegment3),
coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(),
ImmutableList.of(Interval.parse("2015-01-03T00Z/2015-01-03T05Z"), Interval.parse("2015-01-03T09Z/2015-01-04T00Z"))
)
);
}
@Test
public void testSimpleUnUsedList() throws IOException
{