mirror of https://github.com/apache/druid.git
Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
This commit is contained in:
parent
b924161086
commit
063811710e
|
@ -41,6 +41,7 @@ import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
|
|||
import org.apache.druid.timeline.partition.PartialShardSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.chrono.ISOChronology;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashSet;
|
||||
|
@ -189,7 +190,7 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
|
|||
// 1) if something overlaps our timestamp, use that
|
||||
// 2) otherwise try preferredSegmentGranularity & going progressively smaller
|
||||
|
||||
final Interval rowInterval = queryGranularity.bucket(timestamp);
|
||||
final Interval rowInterval = queryGranularity.bucket(timestamp).withChronology(ISOChronology.getInstanceUTC());
|
||||
|
||||
final Set<DataSegment> usedSegmentsForRow =
|
||||
new HashSet<>(msc.retrieveUsedSegmentsForInterval(dataSource, rowInterval, Segments.ONLY_VISIBLE));
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.druid.jackson.DefaultObjectMapper;
|
|||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
|
||||
|
@ -49,6 +50,7 @@ import org.apache.druid.timeline.partition.ShardSpec;
|
|||
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Period;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -948,6 +950,33 @@ public class SegmentAllocateActionTest
|
|||
Assert.assertEquals(ImmutableList.of("dim1"), hashBasedNumberedShardSpec.getPartitionDimensions());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSameIntervalWithSegmentGranularity()
|
||||
{
|
||||
final Task task = NoopTask.create();
|
||||
taskActionTestKit.getTaskLockbox().add(task);
|
||||
Granularity segmentGranularity = new PeriodGranularity(Period.hours(1), null, DateTimes.inferTzFromString("Asia/Shanghai"));
|
||||
|
||||
final SegmentIdWithShardSpec id1 = allocate(
|
||||
task,
|
||||
PARTY_TIME,
|
||||
Granularities.MINUTE,
|
||||
segmentGranularity,
|
||||
"s1",
|
||||
null
|
||||
);
|
||||
final SegmentIdWithShardSpec id2 = allocate(
|
||||
task,
|
||||
PARTY_TIME,
|
||||
Granularities.MINUTE,
|
||||
segmentGranularity,
|
||||
"s2",
|
||||
null
|
||||
);
|
||||
Assert.assertNotNull(id1);
|
||||
Assert.assertNotNull(id2);
|
||||
}
|
||||
|
||||
private SegmentIdWithShardSpec allocate(
|
||||
final Task task,
|
||||
final DateTime timestamp,
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.druid.timeline.partition.PartialShardSpec;
|
|||
import org.apache.druid.timeline.partition.PartitionChunk;
|
||||
import org.apache.druid.timeline.partition.ShardSpec;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.chrono.ISOChronology;
|
||||
import org.skife.jdbi.v2.Folder3;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.Query;
|
||||
|
@ -503,6 +504,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
Preconditions.checkNotNull(sequenceName, "sequenceName");
|
||||
Preconditions.checkNotNull(interval, "interval");
|
||||
Preconditions.checkNotNull(maxVersion, "version");
|
||||
Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC());
|
||||
|
||||
return connector.retryWithHandle(
|
||||
handle -> {
|
||||
|
@ -511,7 +513,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
handle,
|
||||
dataSource,
|
||||
sequenceName,
|
||||
interval,
|
||||
allocateInterval,
|
||||
partialShardSpec,
|
||||
maxVersion
|
||||
);
|
||||
|
@ -521,7 +523,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
dataSource,
|
||||
sequenceName,
|
||||
previousSegmentId,
|
||||
interval,
|
||||
allocateInterval,
|
||||
partialShardSpec,
|
||||
maxVersion
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue