From 70c40c4281d28d17b25ee19691cbfdf11a951bb2 Mon Sep 17 00:00:00 2001 From: tejaswini-imply <96047043+tejaswini-imply@users.noreply.github.com> Date: Thu, 17 Feb 2022 15:13:51 +0530 Subject: [PATCH] Fix long overflow in SegmentCostCache.Bucket.toLocalInterval (#12257) Problem: When using a `CachingCostBalancerStrategy` with segments of granularity ALL, no segment gets loaded. - With granularity ALL, segments of eternity interval are created which have `start = Long.MIN_VALUE / 2` and `end = Long.MAX_VALUE / 2`. - For cost calculation in the balancer strategy, `toLocalInterval()` method is invoked where `Long.MIN_VALUE / 2` or `Long.MAX_VALUE / 2` cause an overflow thus resulting in no overlap. - The strategy is unable to find any eligible server for loading a given segment. Fix: - Reverse order of operations to divide by `MILLIS_FACTOR` (~10^8) first, then do the subtraction to prevent Long overflow. --- .../coordinator/cost/SegmentsCostCache.java | 2 +- .../coordinator/rules/LoadRuleTest.java | 58 +++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java index 561bc3322da..9271de28425 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/SegmentsCostCache.java @@ -307,7 +307,7 @@ public class SegmentsCostCache private static double toLocalInterval(long millis, Interval interval) { - return (millis - interval.getStartMillis()) / MILLIS_FACTOR; + return millis / MILLIS_FACTOR - interval.getStartMillis() / MILLIS_FACTOR; } public static Builder builder(Interval interval) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java index 46da5eddc76..99a975caff4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java @@ -30,12 +30,14 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.core.LoggingEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.BalancerStrategy; +import org.apache.druid.server.coordinator.CachingCostBalancerStrategy; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers; import org.apache.druid.server.coordinator.CoordinatorStats; @@ -48,7 +50,9 @@ import org.apache.druid.server.coordinator.LoadQueuePeonTester; import org.apache.druid.server.coordinator.ReplicationThrottler; import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.cost.ClusterCostCache; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -89,6 +93,8 @@ public class LoadRuleTest private ListeningExecutorService exec; private BalancerStrategy balancerStrategy; + private CachingCostBalancerStrategy cachingCostBalancerStrategy; + private BalancerStrategy mockBalancerStrategy; @Before @@ -101,6 +107,8 @@ public class LoadRuleTest exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); + cachingCostBalancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec); + mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class); } @@ -379,6 +387,48 @@ public class LoadRuleTest EasyMock.verify(throttler, emptyPeon, mockBalancerStrategy); } + @Test + public void testLoadUsedSegmentsForAllSegmentGranularityAndCachingCostBalancerStrategy() + { + EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(false).anyTimes(); + + LoadRule rule = createLoadRule(ImmutableMap.of("tier1", 1)); + + DataSegment segment0 = createDataSegmentWithIntervalAndPartition(createDataSegment("foo"), + JodaUtils.MIN_INSTANT, + JodaUtils.MAX_INSTANT, + 0); + DataSegment segment1 = createDataSegmentWithIntervalAndPartition(createDataSegment("foo"), + JodaUtils.MIN_INSTANT, + JodaUtils.MAX_INSTANT, + 1); + + final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment0), true); + + loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.isNull()); + EasyMock.expectLastCall().once(); + + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(cachingCostBalancerStrategy) + .anyTimes(); + + EasyMock.replay(throttler, loadingPeon, mockBalancerStrategy); + + ImmutableDruidServer server = + new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "tier1", 1).toImmutableDruidServer(); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier("tier1", new ServerHolder(server, loadingPeon)) + .build(); + + final CoordinatorStats stats = rule.run(null, makeCoordinatorRuntimeParamsWithLoadReplicationOnTimeout(druidCluster, segment0, segment1), segment1); + + Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "tier1")); + + EasyMock.verify(throttler, loadingPeon, mockBalancerStrategy); + } + @Test public void testLoadPriority() { @@ -810,6 +860,14 @@ public class LoadRuleTest ); } + private DataSegment createDataSegmentWithIntervalAndPartition(DataSegment dataSegment, long startMillis, long endMillis, int partitionNum) + { + return new DataSegment.Builder(dataSegment) + .interval(new Interval(startMillis, endMillis, dataSegment.getInterval().getChronology())) + .shardSpec(new LinearShardSpec(partitionNum)) + .build(); + } + private static LoadRule createLoadRule(final Map tieredReplicants) { return new LoadRule()