Fix long overflow in SegmentCostCache.Bucket.toLocalInterval (#12257)

Problem:
When using a `CachingCostBalancerStrategy` with segments of granularity ALL,
no segment gets loaded.
- With granularity ALL, segments of eternity interval are created which have 
  `start = Long.MIN_VALUE / 2` and `end = Long.MAX_VALUE / 2`.
- For cost calculation in the balancer strategy, `toLocalInterval()` method is invoked where
  `Long.MIN_VALUE / 2` or `Long.MAX_VALUE / 2` cause an overflow thus resulting in no overlap.
- The strategy is unable to find any eligible server for loading a given segment.

Fix:
- Reverse order of operations to divide by `MILLIS_FACTOR` (~10^8) first,
   then do the subtraction to prevent Long overflow.
This commit is contained in:
tejaswini-imply 2022-02-17 15:13:51 +05:30 committed by GitHub
parent 575874705f
commit 70c40c4281
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 1 deletions

View File

@ -307,7 +307,7 @@ public class SegmentsCostCache
private static double toLocalInterval(long millis, Interval interval) private static double toLocalInterval(long millis, Interval interval)
{ {
return (millis - interval.getStartMillis()) / MILLIS_FACTOR; return millis / MILLIS_FACTOR - interval.getStartMillis() / MILLIS_FACTOR;
} }
public static Builder builder(Interval interval) public static Builder builder(Interval interval)

View File

@ -30,12 +30,14 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.LoggingEmitter; import org.apache.druid.java.util.emitter.core.LoggingEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.BalancerStrategy; import org.apache.druid.server.coordinator.BalancerStrategy;
import org.apache.druid.server.coordinator.CachingCostBalancerStrategy;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers; import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.CoordinatorStats;
@ -48,7 +50,9 @@ import org.apache.druid.server.coordinator.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.ReplicationThrottler; import org.apache.druid.server.coordinator.ReplicationThrottler;
import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.cost.ClusterCostCache;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -89,6 +93,8 @@ public class LoadRuleTest
private ListeningExecutorService exec; private ListeningExecutorService exec;
private BalancerStrategy balancerStrategy; private BalancerStrategy balancerStrategy;
private CachingCostBalancerStrategy cachingCostBalancerStrategy;
private BalancerStrategy mockBalancerStrategy; private BalancerStrategy mockBalancerStrategy;
@Before @Before
@ -101,6 +107,8 @@ public class LoadRuleTest
exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
cachingCostBalancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class); mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
} }
@ -379,6 +387,48 @@ public class LoadRuleTest
EasyMock.verify(throttler, emptyPeon, mockBalancerStrategy); EasyMock.verify(throttler, emptyPeon, mockBalancerStrategy);
} }
@Test
public void testLoadUsedSegmentsForAllSegmentGranularityAndCachingCostBalancerStrategy()
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(false).anyTimes();
LoadRule rule = createLoadRule(ImmutableMap.of("tier1", 1));
DataSegment segment0 = createDataSegmentWithIntervalAndPartition(createDataSegment("foo"),
JodaUtils.MIN_INSTANT,
JodaUtils.MAX_INSTANT,
0);
DataSegment segment1 = createDataSegmentWithIntervalAndPartition(createDataSegment("foo"),
JodaUtils.MIN_INSTANT,
JodaUtils.MAX_INSTANT,
1);
final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment0), true);
loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.isNull());
EasyMock.expectLastCall().once();
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(cachingCostBalancerStrategy)
.anyTimes();
EasyMock.replay(throttler, loadingPeon, mockBalancerStrategy);
ImmutableDruidServer server =
new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "tier1", 1).toImmutableDruidServer();
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier("tier1", new ServerHolder(server, loadingPeon))
.build();
final CoordinatorStats stats = rule.run(null, makeCoordinatorRuntimeParamsWithLoadReplicationOnTimeout(druidCluster, segment0, segment1), segment1);
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "tier1"));
EasyMock.verify(throttler, loadingPeon, mockBalancerStrategy);
}
@Test @Test
public void testLoadPriority() public void testLoadPriority()
{ {
@ -810,6 +860,14 @@ public class LoadRuleTest
); );
} }
private DataSegment createDataSegmentWithIntervalAndPartition(DataSegment dataSegment, long startMillis, long endMillis, int partitionNum)
{
return new DataSegment.Builder(dataSegment)
.interval(new Interval(startMillis, endMillis, dataSegment.getInterval().getChronology()))
.shardSpec(new LinearShardSpec(partitionNum))
.build();
}
private static LoadRule createLoadRule(final Map<String, Integer> tieredReplicants) private static LoadRule createLoadRule(final Map<String, Integer> tieredReplicants)
{ {
return new LoadRule() return new LoadRule()