Exclude pagingIdentifiers that don't apply to a datasource (#4078)

* exclude pagingIdentifiers that don't apply to a datasource to support union datasources

* code review changes

* code review changes
This commit is contained in:
David Lim 2017-03-22 13:32:27 -06:00 committed by Fangjin Yang
parent 1f48198607
commit f68ba4128f
4 changed files with 98 additions and 32 deletions

View File

@ -21,6 +21,7 @@ package io.druid.timeline;
import com.google.common.base.Function;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;
import org.joda.time.DateTime;
@ -44,51 +45,66 @@ public class DataSegmentUtils
@Override
public Interval apply(String identifier)
{
return valueOf(datasource, identifier).getInterval();
SegmentIdentifierParts segmentIdentifierParts = valueOf(datasource, identifier);
if (segmentIdentifierParts == null) {
throw new IAE("Invalid identifier [%s]", identifier);
}
return segmentIdentifierParts.getInterval();
}
};
}
// ignores shard spec
/**
* Parses a segment identifier into its components: dataSource, interval, version, and any trailing tags. Ignores
* shard spec.
*
* It is possible that this method may incorrectly parse an identifier, for example if the dataSource name in the
* identifier contains a DateTime parseable string such as 'datasource_2000-01-01T00:00:00.000Z' and dataSource was
* provided as 'datasource'. The desired behavior in this case would be to return null since the identifier does not
* actually belong to the provided dataSource but a non-null result would be returned. This is an edge case that would
* currently only affect paged select queries with a union dataSource of two similarly-named dataSources as in the
* given example.
*
* @param dataSource the dataSource corresponding to this identifier
* @param identifier segment identifier
* @return a {@link io.druid.timeline.DataSegmentUtils.SegmentIdentifierParts} object if the identifier could be
* parsed, null otherwise
*/
public static SegmentIdentifierParts valueOf(String dataSource, String identifier)
{
SegmentIdentifierParts segmentDesc = parse(dataSource, identifier);
if (segmentDesc == null) {
throw new IllegalArgumentException("Invalid identifier " + identifier);
}
return segmentDesc;
}
private static SegmentIdentifierParts parse(String dataSource, String identifier)
{
if (!identifier.startsWith(String.format("%s_", dataSource))) {
LOGGER.info("Invalid identifier %s", identifier);
return null;
}
String remaining = identifier.substring(dataSource.length() + 1);
String[] splits = remaining.split(DataSegment.delimiter);
if (splits.length < 3) {
LOGGER.info("Invalid identifier %s", identifier);
return null;
}
DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
DateTime start = formatter.parseDateTime(splits[0]);
DateTime end = formatter.parseDateTime(splits[1]);
String version = splits[2];
String trail = splits.length > 3 ? join(splits, DataSegment.delimiter, 3, splits.length) : null;
return new SegmentIdentifierParts(
dataSource,
new Interval(start.getMillis(), end.getMillis()),
version,
trail
);
try {
DateTime start = formatter.parseDateTime(splits[0]);
DateTime end = formatter.parseDateTime(splits[1]);
String version = splits[2];
String trail = splits.length > 3 ? join(splits, DataSegment.delimiter, 3, splits.length) : null;
return new SegmentIdentifierParts(
dataSource,
new Interval(start.getMillis(), end.getMillis()),
version,
trail
);
} catch (IllegalArgumentException e) {
return null;
}
}
public static String withInterval(final String dataSource, final String identifier, Interval newInterval)
{
SegmentIdentifierParts segmentDesc = DataSegmentUtils.parse(dataSource, identifier);
SegmentIdentifierParts segmentDesc = DataSegmentUtils.valueOf(dataSource, identifier);
if (segmentDesc == null) {
// happens for test segments which has invalid segment id.. ignore for now
LOGGER.warn("Invalid segment identifier " + identifier);

View File

@ -103,21 +103,21 @@ public class DataSegmentUtilsTest
Assert.assertEquals(desc, DataSegmentUtils.valueOf(dataSource, desc.toString()));
}
@Test(expected = IllegalArgumentException.class)
@Test
public void testInvalidFormat0()
{
DataSegmentUtils.valueOf("ds", "datasource_2015-01-02T00:00:00.000Z_2014-10-20T00:00:00.000Z_version");
Assert.assertNull(DataSegmentUtils.valueOf("ds", "datasource_2015-01-02T00:00:00.000Z_2014-10-20T00:00:00.000Z_version"));
}
@Test(expected = IllegalArgumentException.class)
@Test
public void testInvalidFormat1()
{
DataSegmentUtils.valueOf("datasource", "datasource_invalid_interval_version");
Assert.assertNull(DataSegmentUtils.valueOf("datasource", "datasource_invalid_interval_version"));
}
@Test(expected = IllegalArgumentException.class)
@Test
public void testInvalidFormat2()
{
DataSegmentUtils.valueOf("datasource", "datasource_2015-01-02T00:00:00.000Z_version");
Assert.assertNull(DataSegmentUtils.valueOf("datasource", "datasource_2015-01-02T00:00:00.000Z_version"));
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -350,7 +351,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
public <T extends LogicalSegment> List<T> filterSegments(SelectQuery query, List<T> segments)
{
// at the point where this code is called, only one datasource should exist.
String dataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
final String dataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
PagingSpec pagingSpec = query.getPagingSpec();
Map<String, Integer> paging = pagingSpec.getPagingIdentifiers();
@ -360,8 +361,22 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
final Granularity granularity = query.getGranularity();
// A paged select query using a UnionDataSource will return pagingIdentifiers from segments in more than one
// dataSource which confuses subsequent queries and causes a failure. To avoid this, filter only the paging keys
// that are applicable to this dataSource so that each dataSource in a union query gets the appropriate keys.
final Iterable<String> filteredPagingKeys = Iterables.filter(
paging.keySet(), new Predicate<String>()
{
@Override
public boolean apply(String input)
{
return DataSegmentUtils.valueOf(dataSource, input) != null;
}
}
);
List<Interval> intervals = Lists.newArrayList(
Iterables.transform(paging.keySet(), DataSegmentUtils.INTERVAL_EXTRACTOR(dataSource))
Iterables.transform(filteredPagingKeys, DataSegmentUtils.INTERVAL_EXTRACTOR(dataSource))
);
Collections.sort(
intervals, query.isDescending() ? Comparators.intervalsByEndThenStart()

View File

@ -21,6 +21,7 @@ package io.druid.query.select;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.CharSource;
@ -33,6 +34,8 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.UnionDataSource;
import io.druid.query.UnionQueryRunner;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.ordering.StringComparators;
import io.druid.segment.IncrementalIndexSegment;
@ -313,6 +316,38 @@ public class MultiSegmentSelectQueryTest
}
}
@Test
public void testPagingIdentifiersForUnionDatasource()
{
Druids.SelectQueryBuilder selectQueryBuilder = Druids
.newSelectQueryBuilder()
.dataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(QueryRunnerTestHelper.dataSource),
new TableDataSource("testing-2")
)
)
)
.intervals(SelectQueryRunnerTest.I_0112_0114)
.granularity(QueryRunnerTestHelper.allGran)
.dimensionSpecs(DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.dimensions))
.pagingSpec(PagingSpec.newSpec(3));
SelectQuery query = selectQueryBuilder.build();
QueryRunner unionQueryRunner = new UnionQueryRunner(runner);
List<Result<SelectResultValue>> results = Sequences.toList(
unionQueryRunner.run(query, ImmutableMap.of()),
Lists.<Result<SelectResultValue>>newArrayList()
);
Map<String, Integer> pagingIdentifiers = results.get(0).getValue().getPagingIdentifiers();
query = query.withPagingSpec(toNextCursor(PagingSpec.merge(Arrays.asList(pagingIdentifiers)), query, 3));
Sequences.toList(unionQueryRunner.run(query, ImmutableMap.of()), Lists.<Result<SelectResultValue>>newArrayList());
}
private PagingSpec toNextCursor(Map<String, Integer> merged, SelectQuery query, int threshold)
{
if (!fromNext) {