Empty partitionDimension has less rollup compared to when explicitly specified (#9861)

* Empty partitionDimension has less rollup compared to the case when it is explicitly specified

* Adding a unit test for the empty partitionDimension scenario. Fixing another test which was failing

* Fixing CI Build Inspection Issue

* Addressing all review comments

* Updating the javadocs for the hash method in HashBasedNumberedShardSpec
This commit is contained in:
Mainak Ghosh 2020-06-05 12:42:42 -07:00 committed by GitHub
parent 77dd5b06ae
commit bcc066a27f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 174 additions and 37 deletions

View File

@ -76,6 +76,16 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
return (((long) hash(timestamp, inputRow)) - getPartitionNum()) % getPartitions() == 0;
}
/**
* This method calculates the hash based on whether {@param partitionDimensions} is null or not.
* If yes, then both {@param timestamp} and dimension columns in {@param inputRow} are used {@link Rows#toGroupKey}
* Or else, columns in {@param partitionDimensions} are used
*
* @param timestamp should be bucketed with query granularity
* @param inputRow row from input data
*
* @return hash value
*/
protected int hash(long timestamp, InputRow inputRow)
{
final List<Object> groupKey = getGroupKey(timestamp, inputRow);

View File

@ -26,9 +26,9 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.IndexTask.ShardSpecs;
import org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
@ -71,6 +71,7 @@ public class CachingLocalSegmentAllocator implements CachingSegmentAllocator
TaskToolbox toolbox,
String dataSource,
String taskId,
Granularity queryGranularity,
@Nullable SupervisorTaskAccess supervisorTaskAccess,
IntervalToSegmentIdsCreator intervalToSegmentIdsCreator
) throws IOException
@ -112,7 +113,7 @@ public class CachingLocalSegmentAllocator implements CachingSegmentAllocator
sequenceNameToSegmentId.put(getSequenceName(interval, segmentIdentifier.getShardSpec()), segmentIdentifier);
}
}
shardSpecs = new ShardSpecs(shardSpecMap);
shardSpecs = new ShardSpecs(shardSpecMap, queryGranularity);
}
private static String findVersion(Map<Interval, String> intervalToVersion, Interval interval)

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task;
import org.apache.druid.indexing.common.task.IndexTask.ShardSpecs;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
/**

View File

@ -108,7 +108,6 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CircularBuffer;
import org.codehaus.plexus.util.FileUtils;
import org.joda.time.Interval;
@ -887,6 +886,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
toolbox,
getDataSource(),
getId(),
dataSchema.getGranularitySpec().getQueryGranularity(),
null,
(CompletePartitionAnalysis) partitionAnalysis
);
@ -1013,36 +1013,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
}
/**
* This class represents a map of (Interval, ShardSpec) and is used for easy shardSpec generation.
*/
static class ShardSpecs
{
private final Map<Interval, List<ShardSpec>> map;
ShardSpecs(final Map<Interval, List<ShardSpec>> map)
{
this.map = map;
}
/**
* Return a shardSpec for the given interval and input row.
*
* @param interval interval for shardSpec
* @param row input row
*
* @return a shardSpec
*/
ShardSpec getShardSpec(Interval interval, InputRow row)
{
final List<ShardSpec> shardSpecs = map.get(interval);
if (shardSpecs == null || shardSpecs.isEmpty()) {
throw new ISE("Failed to get shardSpec for interval[%s]", interval);
}
return shardSpecs.get(0).getLookup(shardSpecs).getShardSpec(row.getTimestampFromEpoch(), row);
}
}
private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema)
{
return ingestionSchema.getIOConfig().getNonNullInputFormat();

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.task;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexing.common.task.IndexTask.ShardSpecs;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;

View File

@ -23,6 +23,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess;
import org.apache.druid.indexing.common.task.batch.partition.CompletePartitionAnalysis;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
@ -77,6 +78,7 @@ public final class SegmentAllocators
final TaskToolbox toolbox,
final String dataSource,
final String taskId,
final Granularity queryGranularity,
final @Nullable SupervisorTaskAccess supervisorTaskAccess,
final CompletePartitionAnalysis partitionAnalysis
) throws IOException
@ -85,6 +87,7 @@ public final class SegmentAllocators
toolbox,
dataSource,
taskId,
queryGranularity,
supervisorTaskAccess,
partitionAnalysis::convertToIntervalToSegmentIds
);

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
* This class represents a map of (Interval, ShardSpec) and is used for easy shardSpec generation.
*/
public class ShardSpecs
{
private final Map<Interval, List<ShardSpec>> map;
private Granularity queryGranularity;
ShardSpecs(final Map<Interval, List<ShardSpec>> map, Granularity queryGranularity)
{
this.map = map;
this.queryGranularity = queryGranularity;
}
/**
* Return a shardSpec for the given interval and input row.
*
* @param interval interval for shardSpec
* @param row input row
*
* @return a shardSpec
*/
ShardSpec getShardSpec(Interval interval, InputRow row)
{
final List<ShardSpec> shardSpecs = map.get(interval);
if (shardSpecs == null || shardSpecs.isEmpty()) {
throw new ISE("Failed to get shardSpec for interval[%s]", interval);
}
final long truncatedTimestamp = queryGranularity.bucketStart(row.getTimestamp()).getMillis();
return shardSpecs.get(0).getLookup(shardSpecs).getShardSpec(truncatedTimestamp, row);
}
}

View File

@ -137,6 +137,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
toolbox,
getDataSource(),
getId(),
granularitySpec.getQueryGranularity(),
new SupervisorTaskAccess(supervisorTaskId, taskClient),
createHashPartitionAnalysisFromPartitionsSpec(granularitySpec, partitionsSpec)
);

View File

@ -161,6 +161,7 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
toolbox,
getDataSource(),
getId(),
ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity(),
new SupervisorTaskAccess(supervisorTaskId, taskClient),
partitionAnalysis
);

View File

@ -1561,8 +1561,8 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertTrue(
StringUtils.format("Actual dimensions: %s", dimensions),
dimensions.equals(Sets.newHashSet("dim", "column_3")) ||
dimensions.equals(Sets.newHashSet("column_2", "column_3"))
dimensions.equals(Sets.newHashSet("column_2")) ||
dimensions.equals(Sets.newHashSet("dim", "column_2", "column_3"))
);
Assert.assertEquals(Collections.singletonList("val"), segment.getMetrics());

View File

@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnaly
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.NoneGranularity;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.PartitionBoundaries;
@ -104,6 +105,7 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
toolbox,
DATASOURCE,
TASKID,
new NoneGranularity(),
new SupervisorTaskAccessWithNullClient(SUPERVISOR_TASKID),
partitionAnalysis
);

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ShardSpecsTest extends IngestionTestBase
{
private final TestUtils testUtils = new TestUtils();
private final ObjectMapper jsonMapper = testUtils.getTestObjectMapper();
public ShardSpecsTest()
{
}
@Test
public void testShardSpecSelectionWithNullPartitionDimension()
{
ShardSpec spec1 = new HashBasedNumberedShardSpec(0, 2, null, jsonMapper);
ShardSpec spec2 = new HashBasedNumberedShardSpec(1, 2, null, jsonMapper);
Map<Interval, List<ShardSpec>> shardSpecMap = new HashMap<>();
shardSpecMap.put(Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"), Lists.newArrayList(spec1, spec2));
ShardSpecs shardSpecs = new ShardSpecs(shardSpecMap, Granularities.HOUR);
String visitorId = "visitorId";
String clientType = "clientType";
long timestamp1 = DateTimes.of("2014-01-01T00:00:00.000Z").getMillis();
InputRow row1 = new MapBasedInputRow(timestamp1,
Lists.newArrayList(visitorId, clientType),
ImmutableMap.of(visitorId, "0", clientType, "iphone")
);
long timestamp2 = DateTimes.of("2014-01-01T00:30:20.456Z").getMillis();
InputRow row2 = new MapBasedInputRow(timestamp2,
Lists.newArrayList(visitorId, clientType),
ImmutableMap.of(visitorId, "0", clientType, "iphone")
);
long timestamp3 = DateTimes.of("2014-01-01T10:10:20.456Z").getMillis();
InputRow row3 = new MapBasedInputRow(timestamp3,
Lists.newArrayList(visitorId, clientType),
ImmutableMap.of(visitorId, "0", clientType, "iphone")
);
ShardSpec spec3 = shardSpecs.getShardSpec(Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"), row1);
ShardSpec spec4 = shardSpecs.getShardSpec(Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"), row2);
ShardSpec spec5 = shardSpecs.getShardSpec(Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"), row3);
Assert.assertSame(true, spec3 == spec4);
Assert.assertSame(false, spec3 == spec5);
}
}

View File

@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.task.SupervisorTaskAccessWithNullClient;
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.NoneGranularity;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -82,6 +83,7 @@ public class HashPartitionCachingLocalSegmentAllocatorTest
toolbox,
DATASOURCE,
TASKID,
new NoneGranularity(),
new SupervisorTaskAccessWithNullClient(SUPERVISOR_TASKID),
partitionAnalysis
);