diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index 4436d717ee3..3a168b01f64 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -41,6 +41,7 @@ import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; import javax.annotation.Nullable; import java.util.HashSet; @@ -189,7 +190,7 @@ public class SegmentAllocateAction implements TaskAction // 1) if something overlaps our timestamp, use that // 2) otherwise try preferredSegmentGranularity & going progressively smaller - final Interval rowInterval = queryGranularity.bucket(timestamp); + final Interval rowInterval = queryGranularity.bucket(timestamp).withChronology(ISOChronology.getInstanceUTC()); final Set usedSegmentsForRow = new HashSet<>(msc.retrieveUsedSegmentsForInterval(dataSource, rowInterval, Segments.ONLY_VISIBLE)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index 3a76276fd69..c5301bf6641 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -34,6 +34,7 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -49,6 +50,7 @@ import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -948,6 +950,33 @@ public class SegmentAllocateActionTest Assert.assertEquals(ImmutableList.of("dim1"), hashBasedNumberedShardSpec.getPartitionDimensions()); } + @Test + public void testSameIntervalWithSegmentGranularity() + { + final Task task = NoopTask.create(); + taskActionTestKit.getTaskLockbox().add(task); + Granularity segmentGranularity = new PeriodGranularity(Period.hours(1), null, DateTimes.inferTzFromString("Asia/Shanghai")); + + final SegmentIdWithShardSpec id1 = allocate( + task, + PARTY_TIME, + Granularities.MINUTE, + segmentGranularity, + "s1", + null + ); + final SegmentIdWithShardSpec id2 = allocate( + task, + PARTY_TIME, + Granularities.MINUTE, + segmentGranularity, + "s2", + null + ); + Assert.assertNotNull(id1); + Assert.assertNotNull(id2); + } + private SegmentIdWithShardSpec allocate( final Task task, final DateTime timestamp, diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 1cf9b945b1f..882fd8c7e3a 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -52,6 +52,7 @@ import org.apache.druid.timeline.partition.PartialShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Query; @@ -503,6 +504,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor Preconditions.checkNotNull(sequenceName, "sequenceName"); Preconditions.checkNotNull(interval, "interval"); Preconditions.checkNotNull(maxVersion, "version"); + Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC()); return connector.retryWithHandle( handle -> { @@ -511,7 +513,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor handle, dataSource, sequenceName, - interval, + allocateInterval, partialShardSpec, maxVersion ); @@ -521,7 +523,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor dataSource, sequenceName, previousSegmentId, - interval, + allocateInterval, partialShardSpec, maxVersion );