Test: Simplify test impl of LoadQueuePeon (#14684)

Changes
- Rename `LoadQueuePeonTester` to `TestLoadQueuePeon`
- Simplify `TestLoadQueuePeon` by removing dependency on `CuratorLoadQueuePeon`
- Remove usages of mock peons in `LoadRuleTest` and use `TestLoadQueuePeon` instead
This commit is contained in:
Kashif Faraz 2023-07-28 16:14:23 +05:30 committed by GitHub
parent d406bafdfc
commit 22290fd632
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 296 additions and 385 deletions

View File

@ -27,7 +27,7 @@ import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.balancer.BalancerSegmentHolder;
import org.apache.druid.server.coordinator.balancer.ReservoirSegmentSampler;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
@ -105,7 +105,7 @@ public class BalancerStrategyBenchmark
ImmutableMap.of("test", new ImmutableDruidDataSource("test", Collections.emptyMap(), segments)),
segments.size()
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
)
);
}

View File

@ -31,8 +31,8 @@ import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.rules.PeriodLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
@ -117,7 +117,7 @@ public class BalanceSegmentsProfiler
EasyMock.expect(server.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(server);
LoadQueuePeon peon = new LoadQueuePeonTester();
LoadQueuePeon peon = new TestLoadQueuePeon();
serverHolderList.add(new ServerHolder(server, peon));
}
@ -154,8 +154,8 @@ public class BalanceSegmentsProfiler
public void profileRun()
{
Stopwatch watch = Stopwatch.createUnstarted();
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
TestLoadQueuePeon fromPeon = new TestLoadQueuePeon();
TestLoadQueuePeon toPeon = new TestLoadQueuePeon();
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();

View File

@ -22,7 +22,7 @@ package org.apache.druid.server.coordinator;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
@ -47,13 +47,13 @@ public class DruidClusterTest
private static final ServerHolder NEW_REALTIME = new ServerHolder(
new DruidServer("name1", "host2", null, 100L, ServerType.REALTIME, "tier1", 0)
.addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
private static final ServerHolder NEW_HISTORICAL = new ServerHolder(
new DruidServer("name1", "host2", null, 100L, ServerType.HISTORICAL, "tier1", 0)
.addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
private DruidCluster.Builder clusterBuilder;
@ -67,14 +67,14 @@ public class DruidClusterTest
new ServerHolder(
new DruidServer("name1", "host1", null, 100L, ServerType.REALTIME, "tier1", 0)
.addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
)
)
.add(
new ServerHolder(
new DruidServer("name1", "host1", null, 100L, ServerType.HISTORICAL, "tier1", 0)
.addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
)
);
}

View File

@ -23,8 +23,8 @@ import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.RoundRobinServerSelector;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
@ -140,7 +140,7 @@ public class RoundRobinServerSelectorTest
return new ServerHolder(
new DruidServer(name, name, null, size, ServerType.HISTORICAL, TIER, 1)
.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
}
}

View File

@ -26,7 +26,7 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
@ -79,7 +79,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
// available size of 100
@ -90,7 +90,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
// available size of 10
@ -101,7 +101,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
// available size of 50
@ -112,7 +112,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
Assert.assertEquals(0, h1.compareTo(h2));
@ -130,7 +130,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
final ServerHolder h2 = new ServerHolder(
@ -140,7 +140,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
final ServerHolder h3 = new ServerHolder(
@ -150,7 +150,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
final ServerHolder h4 = new ServerHolder(
@ -160,7 +160,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
final ServerHolder h5 = new ServerHolder(
@ -170,7 +170,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
Assert.assertEquals(h1, h2);
@ -189,7 +189,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));

View File

@ -24,7 +24,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
@ -84,7 +84,7 @@ public class BalancerStrategyTest
{
final ServerHolder serverHolder = new ServerHolder(
new DruidServer("server1", "host1", null, 10L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
new LoadQueuePeonTester());
new TestLoadQueuePeon());
Assert.assertFalse(
balancerStrategy.findServersToLoadSegment(
proposedDataSegment,
@ -99,12 +99,12 @@ public class BalancerStrategyTest
final ServerHolder serverHolder1 = new ServerHolder(
new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
.addDataSegment(proposedDataSegment).toImmutableDruidServer(),
new LoadQueuePeonTester());
new TestLoadQueuePeon());
final ServerHolder serverHolder2 = new ServerHolder(
new DruidServer("server2", "host2", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
.addDataSegment(proposedDataSegment).toImmutableDruidServer(),
new LoadQueuePeonTester());
new TestLoadQueuePeon());
serverHolders = new ArrayList<>();
serverHolders.add(serverHolder1);
@ -119,7 +119,7 @@ public class BalancerStrategyTest
{
final ServerHolder serverHolder = new ServerHolder(
new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).toImmutableDruidServer(),
new LoadQueuePeonTester());
new TestLoadQueuePeon());
serverHolders = new ArrayList<>();
serverHolders.add(serverHolder);
final ServerHolder foundServerHolder = balancerStrategy

View File

@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -141,7 +141,7 @@ public class CachingCostBalancerStrategyTest
.forEach(druidServer::addDataSegment);
return new ServerHolder(
druidServer.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
}

View File

@ -29,7 +29,7 @@ import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
@ -283,7 +283,7 @@ public class CostBalancerStrategyTest
// Create ServerHolder for each server
final List<ServerHolder> serverHolders = historicals.stream().map(
server -> new ServerHolder(server.toImmutableDruidServer(), new LoadQueuePeonTester())
server -> new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon())
).collect(Collectors.toList());
final ServerHolder serverA = serverHolders.get(0);
@ -321,11 +321,11 @@ public class CostBalancerStrategyTest
{
final ServerHolder serverA = new ServerHolder(
createHistorical().toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
final ServerHolder serverB = new ServerHolder(
createHistorical().toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
final DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);
@ -356,7 +356,7 @@ public class CostBalancerStrategyTest
.startingAt("2012-10-24")
.eachOfSizeInMb(100).get(0);
final LoadQueuePeonTester peon = new LoadQueuePeonTester();
final TestLoadQueuePeon peon = new TestLoadQueuePeon();
ServerHolder serverA = new ServerHolder(createHistorical().toImmutableDruidServer(), peon);
ServerHolder serverB = new ServerHolder(createHistorical().toImmutableDruidServer(), peon);

View File

@ -29,7 +29,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Interval;
@ -57,7 +57,7 @@ public class DiskNormalizedCostBalancerStrategyTest
// Create 10 servers with current size being 3K & max size being 10K
// Each having having 100 segments
for (int i = 0; i < serverCount; i++) {
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
TestLoadQueuePeon fromPeon = new TestLoadQueuePeon();
List<DataSegment> segments = IntStream
.range(0, maxSegments)
@ -78,7 +78,7 @@ public class DiskNormalizedCostBalancerStrategyTest
}
// The best server to be available for next segment assignment has greater max Size
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
TestLoadQueuePeon fromPeon = new TestLoadQueuePeon();
ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();

View File

@ -25,8 +25,8 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
@ -175,7 +175,7 @@ public class ReservoirSegmentSamplerTest
.addDataSegment(segments.get(2))
.addDataSegment(segments.get(3))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
// Try to pick all the segments on the servers
@ -380,6 +380,6 @@ public class ReservoirSegmentSamplerTest
for (DataSegment segment : loadedSegments) {
server.addDataSegment(segment);
}
return new ServerHolder(server.toImmutableDruidServer(), new LoadQueuePeonTester());
return new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon());
}
}

View File

@ -31,8 +31,8 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
@ -423,7 +423,7 @@ public class BalanceSegmentsTest
return new ServerHolder(
server.toImmutableDruidServer(),
new LoadQueuePeonTester(),
new TestLoadQueuePeon(),
isDecommissioning,
maxSegmentsInLoadQueue,
10

View File

@ -33,8 +33,8 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@ -61,10 +61,10 @@ public class UnloadUnusedSegmentsTest
private ImmutableDruidServer historicalServerTier2;
private ImmutableDruidServer brokerServer;
private ImmutableDruidServer indexerServer;
private LoadQueuePeonTester historicalPeon;
private LoadQueuePeonTester historicalTier2Peon;
private LoadQueuePeonTester brokerPeon;
private LoadQueuePeonTester indexerPeon;
private TestLoadQueuePeon historicalPeon;
private TestLoadQueuePeon historicalTier2Peon;
private TestLoadQueuePeon brokerPeon;
private TestLoadQueuePeon indexerPeon;
private DataSegment segment1;
private DataSegment segment2;
private List<DataSegment> segments;
@ -143,10 +143,10 @@ public class UnloadUnusedSegmentsTest
segmentsForRealtime.add(realtimeOnlySegment);
segmentsForRealtime.add(broadcastSegment);
historicalPeon = new LoadQueuePeonTester();
historicalTier2Peon = new LoadQueuePeonTester();
brokerPeon = new LoadQueuePeonTester();
indexerPeon = new LoadQueuePeonTester();
historicalPeon = new TestLoadQueuePeon();
historicalTier2Peon = new TestLoadQueuePeon();
brokerPeon = new TestLoadQueuePeon();
indexerPeon = new TestLoadQueuePeon();
final ImmutableDruidDataSource dataSource1 = new ImmutableDruidDataSource(
"datasource1",

View File

@ -1,61 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.loading;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.concurrent.ConcurrentSkipListSet;
public class LoadQueuePeonTester extends CuratorLoadQueuePeon
{
private final ConcurrentSkipListSet<DataSegment> segmentsToLoad = new ConcurrentSkipListSet<>();
public LoadQueuePeonTester()
{
super(
null,
null,
null,
Execs.scheduledSingleThreaded("LoadQueuePeonTester--%d"),
null,
new TestDruidCoordinatorConfig.Builder()
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
}
@Override
public void loadSegment(DataSegment segment, SegmentAction action, @Nullable LoadPeonCallback callback)
{
segmentsToLoad.add(segment);
}
@Override
public ConcurrentSkipListSet<DataSegment> getSegmentsToLoad()
{
return segmentsToLoad;
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.loading;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/**
* Test implementation of {@link LoadQueuePeon} that maintains a set of segments
* that have been queued for load or drop.
*/
public class TestLoadQueuePeon implements LoadQueuePeon
{
private final ConcurrentSkipListSet<DataSegment> segmentsToLoad = new ConcurrentSkipListSet<>();
private final ConcurrentSkipListSet<DataSegment> segmentsToDrop = new ConcurrentSkipListSet<>();
private final CoordinatorRunStats stats = new CoordinatorRunStats();
@Override
public void loadSegment(DataSegment segment, SegmentAction action, @Nullable LoadPeonCallback callback)
{
segmentsToLoad.add(segment);
}
@Override
public void dropSegment(DataSegment segment, LoadPeonCallback callback)
{
segmentsToDrop.add(segment);
}
@Override
public long getSizeOfSegmentsToLoad()
{
return 0;
}
@Override
public CoordinatorRunStats getAndResetStats()
{
return stats;
}
@Override
public boolean cancelOperation(DataSegment segment)
{
return false;
}
@Override
public void start()
{
}
@Override
public void stop()
{
}
@Override
public ConcurrentSkipListSet<DataSegment> getSegmentsToLoad()
{
return segmentsToLoad;
}
@Override
public Set<SegmentHolder> getSegmentsInQueue()
{
return Collections.emptySet();
}
@Override
public Set<DataSegment> getSegmentsToDrop()
{
return segmentsToDrop;
}
@Override
public Set<DataSegment> getTimedOutSegments()
{
return Collections.emptySet();
}
@Override
public void markSegmentToDrop(DataSegment segmentToLoad)
{
}
@Override
public void unmarkSegmentToDrop(DataSegment segmentToLoad)
{
}
@Override
public Set<DataSegment> getSegmentsMarkedToDrop()
{
return Collections.emptySet();
}
}

View File

@ -27,9 +27,9 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
@ -120,7 +120,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(smallSegment)
.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
holdersOfLargeSegments.add(
@ -135,7 +135,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(0))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
)
);
holdersOfLargeSegments.add(
@ -150,7 +150,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(1))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
)
);
holdersOfLargeSegments.add(
@ -165,7 +165,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(2))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
)
);
@ -181,7 +181,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments2.get(0))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
)
);
holdersOfLargeSegments2.add(
@ -196,7 +196,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments2.get(1))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
)
);
@ -211,7 +211,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(0))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
new TestLoadQueuePeon()
);
decommissioningServer1 = new ServerHolder(
@ -225,7 +225,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(smallSegment)
.toImmutableDruidServer(),
new LoadQueuePeonTester(),
new TestLoadQueuePeon(),
true
);
@ -240,7 +240,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(1))
.toImmutableDruidServer(),
new LoadQueuePeonTester(),
new TestLoadQueuePeon(),
true
);

View File

@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -38,18 +37,14 @@ import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategy;
import org.apache.druid.server.coordinator.balancer.ClusterCostCache;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentHolder;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -62,7 +57,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -96,7 +90,7 @@ public class LoadRuleTest
public void setUp()
{
exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "LoadRuleTest-%d"));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
balancerStrategy = new CostBalancerStrategy(exec);
loadQueueManager = new SegmentLoadQueueManager(null, null, null);
}
@ -107,17 +101,15 @@ public class LoadRuleTest
}
@Test
public void testLoad()
public void testLoadRuleAssignsSegments()
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(mockPeon);
// Cluster has 2 tiers with 1 server each
final ServerHolder server1 = createServer(Tier.T1);
final ServerHolder server2 = createServer(Tier.T2);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false))
.addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon, false))
.addTier(Tier.T1, server1)
.addTier(Tier.T2, server2)
.build();
final DataSegment segment = createDataSegment(DS_WIKI);
@ -126,8 +118,6 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI));
EasyMock.verify(mockPeon);
}
private CoordinatorRunStats runRuleAndGetStats(LoadRule rule, DataSegment segment, DruidCluster cluster)
@ -169,99 +159,56 @@ public class LoadRuleTest
@Test
public void testLoadPrimaryAssignDoesNotOverAssign()
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(mockPeon);
ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer();
ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer();
ServerHolder server1 = createServer(Tier.T1);
ServerHolder server2 = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, new ServerHolder(server1, mockPeon), new ServerHolder(server2, mockPeon))
.addTier(Tier.T1, server1, server2)
.build();
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource()));
CoordinatorRunStats firstRunStats = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertEquals(1L, firstRunStats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource()));
Assert.assertEquals(1, server1.getLoadingSegments().size() + server2.getLoadingSegments().size());
// ensure multiple runs don't assign primary segment again if at replication count
final LoadQueuePeon loadingPeon = createLoadingPeon(segment, false);
EasyMock.replay(loadingPeon);
DruidCluster afterLoad = DruidCluster
.builder()
.addTier(Tier.T1, new ServerHolder(server1, loadingPeon), new ServerHolder(server2, mockPeon))
.build();
CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, segment, afterLoad);
Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
EasyMock.verify(mockPeon);
// Verify that multiple runs don't assign primary segment again if at replication count
CoordinatorRunStats secondRunStats = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertFalse(secondRunStats.hasStat(Stats.Segments.ASSIGNED));
Assert.assertEquals(1, server1.getLoadingSegments().size() + server2.getLoadingSegments().size());
}
@Test
@Ignore("Enable this test when timeout behaviour is fixed")
public void testOverAssignForTimedOutSegments()
{
final LoadQueuePeon emptyPeon = createEmptyPeon();
emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(emptyPeon);
ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer();
ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer();
ServerHolder server1 = createServer(Tier.T1);
ServerHolder server2 = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon))
.addTier(Tier.T1, server1, server2)
.build();
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
segment,
makeCoordinatorRuntimeParams(druidCluster, segment)
);
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
// Ensure that the segment is assigned to one of the historicals
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource()));
// Ensure that the primary segment is assigned again in case the peon timed out on loading the segment
final LoadQueuePeon slowLoadingPeon = createLoadingPeon(segment, true);
EasyMock.replay(slowLoadingPeon);
DruidCluster withLoadTimeout = DruidCluster
.builder()
.addTier(Tier.T1, new ServerHolder(server1, slowLoadingPeon), new ServerHolder(server2, emptyPeon))
.build();
CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(
rule,
segment,
makeCoordinatorRuntimeParams(withLoadTimeout, segment)
);
CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertEquals(1L, statsAfterLoadPrimary.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
EasyMock.verify(emptyPeon);
}
@Test
public void testSkipReplicationForTimedOutSegments()
{
final LoadQueuePeon emptyPeon = createEmptyPeon();
emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(emptyPeon);
ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer();
ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer();
ServerHolder server1 = createServer(Tier.T1);
ServerHolder server2 = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon))
.addTier(Tier.T1, server1, server2)
.build();
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
@ -272,20 +219,10 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource()));
// Add the segment to the timed out list to simulate peon timeout on loading the segment
final LoadQueuePeon slowLoadingPeon = createLoadingPeon(segment, true);
EasyMock.replay(slowLoadingPeon);
DruidCluster withLoadTimeout = DruidCluster
.builder()
.addTier(Tier.T1, new ServerHolder(server1, slowLoadingPeon), new ServerHolder(server2, emptyPeon))
.build();
// Default behavior is to not replicate the timed out segments on other servers
CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, segment, withLoadTimeout);
CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
EasyMock.verify(emptyPeon);
}
@Test
@ -297,14 +234,10 @@ public class LoadRuleTest
.withNumPartitions(2)
.eachOfSizeInMb(100);
final LoadQueuePeon loadingPeon = createLoadingPeon(segments.get(0), true);
loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().once();
EasyMock.replay(loadingPeon);
final ServerHolder server1 = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, createServerHolder(Tier.T1, loadingPeon, false))
.addTier(Tier.T1, server1)
.build();
balancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
@ -316,104 +249,71 @@ public class LoadRuleTest
makeCoordinatorRuntimeParams(druidCluster, segments.toArray(new DataSegment[0]))
);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
EasyMock.verify(loadingPeon);
}
@Test
public void testDrop()
public void testSegmentsAreDroppedIfLoadRuleHasZeroReplicas()
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(mockPeon);
final DataSegment segment = createDataSegment(DS_WIKI);
DruidServer server1 = createServer(Tier.T1);
server1.addDataSegment(segment);
DruidServer server2 = createServer(Tier.T2);
server2.addDataSegment(segment);
DruidServer server3 = createServer(Tier.T2);
final ServerHolder serverT11 = createServer(Tier.T1, segment);
final ServerHolder serverT12 = createServer(Tier.T2, segment);
final ServerHolder serverT21 = createServer(Tier.T2, segment);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, new ServerHolder(server1.toImmutableDruidServer(), mockPeon))
.addTier(
Tier.T2,
new ServerHolder(server2.toImmutableDruidServer(), mockPeon),
new ServerHolder(server3.toImmutableDruidServer(), mockPeon)
)
.addTier(Tier.T1, serverT11)
.addTier(Tier.T2, serverT12, serverT21)
.build();
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0, Tier.T2, 0));
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI));
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T2, DS_WIKI));
EasyMock.verify(mockPeon);
Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T2, DS_WIKI));
}
@Test
public void testLoadWithNonExistentTier()
public void testLoadIgnoresInvalidTiers()
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(mockPeon);
ServerHolder server = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false))
.addTier(Tier.T1, server)
.build();
final DataSegment segment = createDataSegment(DS_WIKI);
LoadRule rule = loadForever(ImmutableMap.of("nonExistentTier", 1, Tier.T1, 1));
LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1));
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
EasyMock.verify(mockPeon);
Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.ASSIGNED, "invalidTier", DS_WIKI));
}
@Test
public void testDropWithNonExistentTier()
public void testDropIgnoresInvalidTiers()
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(mockPeon);
final DataSegment segment = createDataSegment(DS_WIKI);
DruidServer server1 = createServer(Tier.T1);
DruidServer server2 = createServer(Tier.T1);
server1.addDataSegment(segment);
server2.addDataSegment(segment);
// Cluster has 1 tier with 2 servers
ServerHolder server1 = createServer(Tier.T1, segment);
ServerHolder server2 = createServer(Tier.T1, segment);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(
Tier.T1,
new ServerHolder(server1.toImmutableDruidServer(), mockPeon),
new ServerHolder(server2.toImmutableDruidServer(), mockPeon)
)
.addTier(Tier.T1, server1, server2)
.build();
LoadRule rule = loadForever(ImmutableMap.of("nonExistentTier", 1, Tier.T1, 1));
LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1));
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI));
EasyMock.verify(mockPeon);
Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.DROPPED, "invalidTier", DS_WIKI));
}
@Test
public void testMaxLoadingQueueSize()
{
final LoadQueuePeonTester peon = new LoadQueuePeonTester();
final TestLoadQueuePeon peon = new TestLoadQueuePeon();
final int maxSegmentsInQueue = 2;
DruidCluster druidCluster = DruidCluster
@ -421,7 +321,7 @@ public class LoadRuleTest
.addTier(
Tier.T1,
new ServerHolder(
createServer(Tier.T1).toImmutableDruidServer(),
createDruidServer(Tier.T1).toImmutableDruidServer(),
peon, false, maxSegmentsInQueue, 10
)
)
@ -456,63 +356,55 @@ public class LoadRuleTest
Assert.assertEquals(0L, stats3.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, dataSegment3.getDataSource()));
}
/**
* 2 servers in different tiers, the first is decommissioning.
* Should not load a segment to the server that is decommissioning
*/
@Test
public void testLoadDecommissioning()
public void testSegmentIsAssignedOnlyToActiveServer()
{
final LoadQueuePeon mockPeon1 = createEmptyPeon();
final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
EasyMock.replay(mockPeon1, mockPeon2);
final ServerHolder decommServerT1 = createDecommissioningServer(Tier.T1);
final ServerHolder serverT2 = createServer(Tier.T2);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon1, true))
.addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon2, false))
.addTier(Tier.T1, decommServerT1)
.addTier(Tier.T2, serverT2)
.build();
// Load rule requires 1 replica on each tier
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 1));
DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
// Verify that segment is not loaded on decommissioning server
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI));
EasyMock.verify(mockPeon1, mockPeon2);
Assert.assertEquals(0, decommServerT1.getLoadingSegments().size());
Assert.assertTrue(serverT2.getLoadingSegments().contains(segment));
}
/**
* 2 tiers, 2 servers each, 1 server of the second tier is decommissioning.
* Should not load a segment to the server that is decommssioning.
*/
@Test
public void testLoadReplicaDuringDecommissioning()
public void testSegmentIsAssignedOnlyToActiveServers()
{
final LoadQueuePeon mockPeon1 = createEmptyPeon();
final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
final LoadQueuePeon mockPeon3 = createOneCallPeonMock();
final LoadQueuePeon mockPeon4 = createOneCallPeonMock();
EasyMock.replay(mockPeon1, mockPeon2, mockPeon3, mockPeon4);
ServerHolder holder1 = createServerHolder(Tier.T1, mockPeon1, true);
ServerHolder holder2 = createServerHolder(Tier.T1, mockPeon2, false);
ServerHolder holder3 = createServerHolder(Tier.T2, mockPeon3, false);
ServerHolder holder4 = createServerHolder(Tier.T2, mockPeon4, false);
// 2 tiers with 2 servers each, 1 server is decommissioning
ServerHolder decommServerT11 = createDecommissioningServer(Tier.T1);
ServerHolder serverT12 = createServer(Tier.T1);
ServerHolder serverT21 = createServer(Tier.T2);
ServerHolder serverT22 = createServer(Tier.T2);
final DataSegment segment = createDataSegment(DS_WIKI);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, holder1, holder2)
.addTier(Tier.T2, holder3, holder4)
.addTier(Tier.T1, decommServerT11, serverT12)
.addTier(Tier.T2, serverT21, serverT22)
.build();
// Load rule requires 2 replicas on each server
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 2, Tier.T2, 2));
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
// Verify that no replica is assigned to decommissioning server
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI));
Assert.assertTrue(decommServerT11.getLoadingSegments().isEmpty());
Assert.assertEquals(0, decommServerT11.getLoadingSegments().size());
EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4);
Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI));
}
/**
@ -522,26 +414,15 @@ public class LoadRuleTest
@Test
public void testDropDuringDecommissioning()
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().times(2);
EasyMock.replay(mockPeon);
final DataSegment segment1 = createDataSegment("foo1");
final DataSegment segment2 = createDataSegment("foo2");
DruidServer server1 = createServer(Tier.T1);
server1.addDataSegment(segment1);
DruidServer server2 = createServer(Tier.T1);
server2.addDataSegment(segment2);
final ServerHolder server1 = createDecommissioningServer(Tier.T1, segment1);
final ServerHolder server2 = createServer(Tier.T1, segment2);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(
Tier.T1,
new ServerHolder(server1.toImmutableDruidServer(), mockPeon, true),
new ServerHolder(server2.toImmutableDruidServer(), mockPeon, false)
)
.addTier(Tier.T1, server1, server2)
.build();
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, segment1, segment2);
@ -549,45 +430,29 @@ public class LoadRuleTest
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment1, params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, segment1.getDataSource()));
Assert.assertTrue(server1.getPeon().getSegmentsToDrop().contains(segment1));
stats = runRuleAndGetStats(rule, segment2, params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, segment2.getDataSource()));
EasyMock.verify(mockPeon);
Assert.assertTrue(server2.getPeon().getSegmentsToDrop().contains(segment2));
}
/**
* 3 servers hosting 3 replicas of the segment.
* 1 servers is decommissioning.
* 1 replica is redundant.
* Should drop from the decommissioning server.
*/
@Test
public void testRedundantReplicaDropDuringDecommissioning()
public void testExtraReplicasAreDroppedFromDecommissioningServer()
{
final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester();
final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester();
final LoadQueuePeon mockPeon3 = new LoadQueuePeonTester();
final DataSegment segment1 = createDataSegment(DS_WIKI);
DruidServer server1 = createServer(Tier.T1);
server1.addDataSegment(segment1);
DruidServer server2 = createServer(Tier.T1);
server2.addDataSegment(segment1);
DruidServer server3 = createServer(Tier.T1);
server3.addDataSegment(segment1);
// 3 servers, each serving the same segment
final ServerHolder server1 = createServer(Tier.T1, segment1);
final ServerHolder server2 = createDecommissioningServer(Tier.T1, segment1);
final ServerHolder server3 = createServer(Tier.T1, segment1);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(
Tier.T1,
new ServerHolder(server1.toImmutableDruidServer(), mockPeon1, false),
new ServerHolder(server2.toImmutableDruidServer(), mockPeon2, true),
new ServerHolder(server3.toImmutableDruidServer(), mockPeon3, false)
)
.addTier(Tier.T1, server1, server2, server3)
.build();
// Load rule requires 2 replicas
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 2));
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
@ -595,10 +460,11 @@ public class LoadRuleTest
makeCoordinatorRuntimeParams(druidCluster, segment1)
);
// Verify that the extra replica is dropped from the decommissioning server
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI));
Assert.assertEquals(0, mockPeon1.getSegmentsToDrop().size());
Assert.assertEquals(1, mockPeon2.getSegmentsToDrop().size());
Assert.assertEquals(0, mockPeon3.getSegmentsToDrop().size());
Assert.assertEquals(0, server1.getPeon().getSegmentsToDrop().size());
Assert.assertEquals(1, server2.getPeon().getSegmentsToDrop().size());
Assert.assertEquals(0, server3.getPeon().getSegmentsToDrop().size());
}
private DataSegment createDataSegment(String dataSource)
@ -621,54 +487,36 @@ public class LoadRuleTest
return new ForeverLoadRule(tieredReplicants, null);
}
private static LoadQueuePeon createEmptyPeon()
{
final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsInQueue()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSizeOfSegmentsToLoad()).andReturn(0L).anyTimes();
return mockPeon;
}
private static LoadQueuePeon createLoadingPeon(DataSegment segment, boolean slowLoading)
{
final Set<DataSegment> segs = Collections.singleton(segment);
final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(segs).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsToDrop()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsInQueue())
.andReturn(Collections.singleton(new SegmentHolder(segment, SegmentAction.LOAD, null))).anyTimes();
EasyMock.expect(mockPeon.getTimedOutSegments())
.andReturn(slowLoading ? segs : Collections.emptySet()).anyTimes();
return mockPeon;
}
private DruidServer createServer(String tier)
private DruidServer createDruidServer(String tier)
{
final String serverName = "hist_" + tier + "_" + serverId.incrementAndGet();
return new DruidServer(serverName, serverName, null, 10L << 30, ServerType.HISTORICAL, tier, 0);
}
private static LoadQueuePeon createOneCallPeonMock()
private ServerHolder createServer(String tier, DataSegment... segments)
{
final LoadQueuePeon mockPeon2 = createEmptyPeon();
mockPeon2.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().once();
return mockPeon2;
final DruidServer server = createDruidServer(tier);
for (DataSegment segment : segments) {
server.addDataSegment(segment);
}
return new ServerHolder(
server.toImmutableDruidServer(),
new TestLoadQueuePeon()
);
}
private ServerHolder createServerHolder(String tier, LoadQueuePeon loadQueuePeon, boolean isDecommissioning)
private ServerHolder createDecommissioningServer(String tier, DataSegment... segments)
{
final DruidServer server = createDruidServer(tier);
for (DataSegment segment : segments) {
server.addDataSegment(segment);
}
return new ServerHolder(
createServer(tier).toImmutableDruidServer(),
loadQueuePeon,
isDecommissioning
server.toImmutableDruidServer(),
new TestLoadQueuePeon(),
true
);
}