Avoid deletion of load/drop entry from CuratorLoadQueuePeon in case of load timeout (#10213)

* Skip queue removal on timeout

* Clarify error

* Add new config to control replication

Co-authored-by: Atul Mohan <atulmohan@yahoo-inc.com>
This commit is contained in:
Atul Mohan 2021-03-17 13:34:05 -05:00 committed by GitHub
parent 1061faa6ba
commit 3d7e7c2c83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 352 additions and 54 deletions

View File

@ -775,7 +775,8 @@ A sample Coordinator dynamic config JSON object is shown below:
"killDataSourceWhitelist": ["wikipedia", "testDatasource"],
"decommissioningNodes": ["localhost:8182", "localhost:8282"],
"decommissioningMaxPercentOfMaxSegmentsToMove": 70,
"pauseCoordination": false
"pauseCoordination": false,
"replicateAfterLoadTimeout": false
}
```
@ -799,6 +800,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
|`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70|
|`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false|
|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false|
To view the audit history of Coordinator dynamic config issue a GET request to the URL -

View File

@ -88,6 +88,14 @@ public class CoordinatorDynamicConfig
private final int maxSegmentsInNodeLoadingQueue;
private final boolean pauseCoordination;
/**
* This decides whether additional replication is needed for segments that have failed to load due to a load timeout.
* When enabled, the coordinator will attempt to replicate the failed segment on a different historical server.
* The historical which failed to load the segment may still load the segment later. Therefore, enabling this setting
* works better if there are a few slow historicals in the cluster and segment availability needs to be sped up.
*/
private final boolean replicateAfterLoadTimeout;
private static final Logger log = new Logger(CoordinatorDynamicConfig.class);
@JsonCreator
@ -120,7 +128,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
@JsonProperty("decommissioningNodes") Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") boolean pauseCoordination
@JsonProperty("pauseCoordination") boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@ -166,6 +175,7 @@ public class CoordinatorDynamicConfig
);
}
this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
}
private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray)
@ -320,6 +330,12 @@ public class CoordinatorDynamicConfig
return pauseCoordination;
}
@JsonProperty
public boolean getReplicateAfterLoadTimeout()
{
return replicateAfterLoadTimeout;
}
@Override
public String toString()
{
@ -341,6 +357,7 @@ public class CoordinatorDynamicConfig
", decommissioningNodes=" + decommissioningNodes +
", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove +
", pauseCoordination=" + pauseCoordination +
", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout +
'}';
}
@ -402,6 +419,9 @@ public class CoordinatorDynamicConfig
if (pauseCoordination != that.pauseCoordination) {
return false;
}
if (replicateAfterLoadTimeout != that.replicateAfterLoadTimeout) {
return false;
}
return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove;
}
@ -449,6 +469,7 @@ public class CoordinatorDynamicConfig
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
private Long mergeBytesLimit;
@ -466,6 +487,7 @@ public class CoordinatorDynamicConfig
private Object decommissioningNodes;
private Integer decommissioningMaxPercentOfMaxSegmentsToMove;
private Boolean pauseCoordination;
private Boolean replicateAfterLoadTimeout;
public Builder()
{
@ -490,7 +512,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove")
@Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@ -510,6 +533,7 @@ public class CoordinatorDynamicConfig
this.decommissioningNodes = decommissioningNodes;
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;
this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
}
public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis)
@ -602,6 +626,12 @@ public class CoordinatorDynamicConfig
return this;
}
public Builder withReplicateAfterLoadTimeout(boolean replicateAfterLoadTimeout)
{
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
return this;
}
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
@ -629,7 +659,8 @@ public class CoordinatorDynamicConfig
decommissioningMaxPercentOfMaxSegmentsToMove == null
? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
: decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination,
replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout
);
}
@ -663,7 +694,8 @@ public class CoordinatorDynamicConfig
decommissioningMaxPercentOfMaxSegmentsToMove == null
? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
: decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination
pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination,
replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout
);
}
}

View File

@ -111,6 +111,15 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
/**
* Needs to be thread safe since it can be concurrently accessed via
* {@link #failAssign(SegmentHolder, boolean, Exception)}, {@link #actionCompleted(SegmentHolder)},
* {@link #getTimedOutSegments()} and {@link #stop()}
*/
private final ConcurrentSkipListSet<DataSegment> timedOutSegments = new ConcurrentSkipListSet<>(
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
CuratorLoadQueuePeon(
CuratorFramework curator,
String basePath,
@ -149,6 +158,12 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
return segmentsMarkedToDrop;
}
@Override
public Set<DataSegment> getTimedOutSegments()
{
return timedOutSegments;
}
@Override
public long getLoadQueueSize()
{
@ -268,10 +283,10 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
// This is expected when historicals haven't yet picked up processing this segment and coordinator
// tries reassigning it to the same node.
log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed");
failAssign(segmentHolder);
failAssign(segmentHolder, true);
}
catch (Exception e) {
failAssign(segmentHolder, e);
failAssign(segmentHolder, false, e);
}
}
@ -282,14 +297,21 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
() -> {
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(segmentHolder, new ISE("%s was never removed! Failing this operation!", path));
failAssign(
segmentHolder,
true,
new ISE("Failing this %s operation since it timed out and %s was never removed! These segments might still get processed",
segmentHolder.getType() == DROP ? "DROP" : "LOAD",
path
)
);
} else {
log.debug("%s detected to be removed. ", path);
}
}
catch (Exception e) {
log.error(e, "Exception caught and ignored when checking whether zk node was deleted");
failAssign(segmentHolder, e);
failAssign(segmentHolder, false, e);
}
},
config.getLoadTimeoutDelay().getMillis(),
@ -307,10 +329,12 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
// See https://github.com/apache/druid/pull/10362 for more details.
if (null != segmentsToLoad.remove(segmentHolder.getSegment())) {
queuedSize.addAndGet(-segmentHolder.getSegmentSize());
timedOutSegments.remove(segmentHolder.getSegment());
}
break;
case DROP:
segmentsToDrop.remove(segmentHolder.getSegment());
timedOutSegments.remove(segmentHolder.getSegment());
break;
default:
throw new UnsupportedOperationException();
@ -337,6 +361,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
}
segmentsToLoad.clear();
timedOutSegments.clear();
queuedSize.set(0L);
failedAssignCount.set(0);
}
@ -361,21 +386,33 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
);
}
private void failAssign(SegmentHolder segmentHolder)
private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout)
{
failAssign(segmentHolder, null);
failAssign(segmentHolder, handleTimeout, null);
}
private void failAssign(SegmentHolder segmentHolder, Exception e)
private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout, Exception e)
{
if (e != null) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, segmentHolder);
}
failedAssignCount.getAndIncrement();
// Act like it was completed so that the coordinator gives it to someone else
if (handleTimeout) {
// Avoid removing the segment entry from the load/drop list in case config.getLoadTimeoutDelay() expires.
// This is because the ZK Node is still present and it may be processed after this timeout and so the coordinator
// needs to take this into account.
log.debug(
"Skipping segment removal from [%s] queue, since ZK Node still exists!",
segmentHolder.getType() == DROP ? "DROP" : "LOAD"
);
timedOutSegments.add(segmentHolder.getSegment());
executeCallbacks(segmentHolder);
} else {
// This may have failed for a different reason and so act like it was completed.
actionCompleted(segmentHolder);
}
}
private static class SegmentHolder
{

View File

@ -853,7 +853,7 @@ public class DruidCoordinator
startPeonsForNewServers(currentServers);
final DruidCluster cluster = prepareCluster(params, currentServers);
segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
segmentReplicantLookup = SegmentReplicantLookup.make(cluster, getDynamicConfigs().getReplicateAfterLoadTimeout());
stopPeonsForDisappearedServers(currentServers);

View File

@ -428,6 +428,12 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
return Collections.unmodifiableSet(segmentsToDrop.keySet());
}
@Override
public Set<DataSegment> getTimedOutSegments()
{
return Collections.emptySet();
}
@Override
public long getLoadQueueSize()
{

View File

@ -37,6 +37,8 @@ public abstract class LoadQueuePeon
public abstract Set<DataSegment> getSegmentsToDrop();
public abstract Set<DataSegment> getTimedOutSegments();
public abstract void unmarkSegmentToDrop(DataSegment segmentToLoad);

View File

@ -36,9 +36,15 @@ import java.util.SortedSet;
*/
public class SegmentReplicantLookup
{
public static SegmentReplicantLookup make(DruidCluster cluster)
public static SegmentReplicantLookup make(DruidCluster cluster, boolean replicateAfterLoadTimeout)
{
final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
/**
* For each tier, this stores the number of replicants for all the segments presently queued to load in {@link cluster}.
* Segments that have failed to load due to the load timeout may not be present in this table if {@link replicateAfterLoadTimeout} is true.
* This is to enable additional replication of the timed out segments for improved availability.
*/
final Table<SegmentId, String, Integer> loadingSegments = HashBasedTable.create();
for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
@ -59,10 +65,14 @@ public class SegmentReplicantLookup
if (numReplicants == null) {
numReplicants = 0;
}
// Timed out segments need to be replicated in another server for faster availability.
// Therefore we skip incrementing numReplicants for timed out segments if replicateAfterLoadTimeout is enabled.
if (!replicateAfterLoadTimeout || !serverHolder.getPeon().getTimedOutSegments().contains(segment)) {
loadingSegments.put(segment.getId(), server.getTier(), numReplicants + 1);
}
}
}
}
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments, cluster);
}

View File

@ -32,7 +32,7 @@ public class CoordinatorRuntimeParamsTestHelpers
{
return newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster));
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false));
}
private CoordinatorRuntimeParamsTestHelpers()

View File

@ -225,6 +225,7 @@ public class LoadQueuePeonTest extends CuratorTestBase
Assert.assertEquals(6000, loadQueuePeon.getLoadQueueSize());
Assert.assertEquals(5, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(5, loadQueuePeon.getSegmentsToDrop().size());
Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size());
for (DataSegment segment : segmentToDrop) {
String dropRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString());
@ -268,12 +269,63 @@ public class LoadQueuePeonTest extends CuratorTestBase
}
@Test
public void testFailAssign() throws Exception
public void testFailAssignForNonTimeoutFailures() throws Exception
{
final DataSegment segment = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
final CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOAD_QUEUE_PATH,
// This will fail inside SegmentChangeProcessor.run()
null,
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"),
// set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly
new TestDruidCoordinatorConfig(
null,
null,
null,
new Duration(1),
null,
null,
10,
new Duration("PT1s")
)
);
loadQueuePeon.start();
loadQueueCache.start();
loadQueuePeon.loadSegment(
segment,
new LoadPeonCallback()
{
@Override
public void execute()
{
segmentLoadedSignal.countDown();
}
}
);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal));
Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size());
}
@Test
public void testFailAssignForLoadDropTimeout() throws Exception
{
final DataSegment segment = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
final CountDownLatch loadRequestSignal = new CountDownLatch(1);
final CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
final CountDownLatch delayedSegmentLoadedSignal = new CountDownLatch(2);
final CountDownLatch loadRequestRemoveSignal = new CountDownLatch(1);
loadQueuePeon = new CuratorLoadQueuePeon(
@ -326,11 +378,13 @@ public class LoadQueuePeonTest extends CuratorTestBase
public void execute()
{
segmentLoadedSignal.countDown();
delayedSegmentLoadedSignal.countDown();
}
}
);
String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString());
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal));
Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
Assert.assertEquals(
@ -340,14 +394,20 @@ public class LoadQueuePeonTest extends CuratorTestBase
.getSegment()
);
// don't simulate completion of load request here
// simulate incompletion of load request since request has timed out
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal));
Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
Assert.assertEquals(1, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(1200L, loadQueuePeon.getLoadQueueSize());
Assert.assertEquals(1, loadQueuePeon.getTimedOutSegments().size());
// simulate completion of load request by historical after time out on coordinator
curator.delete().guaranteed().forPath(loadRequestPath);
Assert.assertTrue(timing.forWaiting().awaitLatch(delayedSegmentLoadedSignal));
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestRemoveSignal));
Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size());
}
private DataSegment dataSegmentWithInterval(String intervalStr)

View File

@ -204,7 +204,7 @@ public class RunRulesTest
)
{
return createCoordinatorRuntimeParams(druidCluster, dataSegments)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false))
.withBalancerStrategy(balancerStrategy);
}
@ -336,7 +336,7 @@ public class RunRulesTest
.addTier("normal", new ServerHolder(normServer.toImmutableDruidServer(), mockPeon))
.build();
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
@ -446,7 +446,7 @@ public class RunRulesTest
.build();
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster(), false))
.withEmitter(emitter)
.build();
@ -488,7 +488,7 @@ public class RunRulesTest
.addTier("normal", new ServerHolder(server.toImmutableDruidServer(), mockPeon))
.build();
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
@ -544,7 +544,7 @@ public class RunRulesTest
)
.build();
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
@ -604,7 +604,7 @@ public class RunRulesTest
.addTier("normal", new ServerHolder(server2.toImmutableDruidServer(), mockPeon))
.build();
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
@ -656,7 +656,7 @@ public class RunRulesTest
.addTier("normal", new ServerHolder(server2.toImmutableDruidServer(), mockPeon))
.build();
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
@ -722,7 +722,7 @@ public class RunRulesTest
)
.build();
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
@ -821,7 +821,7 @@ public class RunRulesTest
.withUsedSegmentsInTest(overFlowSegment)
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategy(balancerStrategy)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster(), false))
.build()
);
stats = afterParams.getCoordinatorStats();
@ -971,7 +971,7 @@ public class RunRulesTest
)
.build();
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
@ -1060,7 +1060,7 @@ public class RunRulesTest
.withDruidCluster(druidCluster)
.withUsedSegmentsInTest(usedSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster(), false))
.withBalancerStrategy(balancerStrategy)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
.build();

View File

@ -308,7 +308,7 @@ public class BroadcastDistributionRuleTest
return CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false))
.withUsedSegmentsInTest(usedSegments)
.build();
}

View File

@ -179,7 +179,22 @@ public class LoadRuleTest
return CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withUsedSegmentsInTest(usedSegments)
.build();
}
private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParamsWithLoadReplicationOnTimeout(
DruidCluster druidCluster,
DataSegment... usedSegments
)
{
return CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, true))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withUsedSegmentsInTest(usedSegments)
@ -226,7 +241,7 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
// ensure multiple runs don't assign primary segment again if at replication count
final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment));
final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment), false);
EasyMock.replay(loadingPeon);
DruidCluster afterLoad = DruidClusterBuilder
@ -245,6 +260,125 @@ public class LoadRuleTest
EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
}
@Test
public void testOverAssignForTimedOutSegments()
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();
final LoadQueuePeon emptyPeon = createEmptyPeon();
emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
LoadRule rule = createLoadRule(ImmutableMap.of(
"hot", 1
));
final DataSegment segment = createDataSegment("foo");
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.anyTimes();
EasyMock.replay(throttler, emptyPeon, mockBalancerStrategy);
ImmutableDruidServer server1 =
new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "hot", 1).toImmutableDruidServer();
ImmutableDruidServer server2 =
new DruidServer("serverHot2", "hostHot2", null, 1000, ServerType.HISTORICAL, "hot", 1).toImmutableDruidServer();
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier("hot", new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon))
.build();
CoordinatorStats stats = rule.run(
null,
makeCoordinatorRuntimeParamsWithLoadReplicationOnTimeout(druidCluster, segment),
segment
);
// Ensure that the segment is assigned to one of the historicals
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
// Ensure that the primary segment is assigned again in case the peon timed out on loading the segment
final LoadQueuePeon slowLoadingPeon = createLoadingPeon(ImmutableList.of(segment), true);
EasyMock.replay(slowLoadingPeon);
DruidCluster withLoadTimeout = DruidClusterBuilder
.newBuilder()
.addTier("hot", new ServerHolder(server1, slowLoadingPeon), new ServerHolder(server2, emptyPeon))
.build();
CoordinatorStats statsAfterLoadPrimary = rule.run(
null,
makeCoordinatorRuntimeParamsWithLoadReplicationOnTimeout(withLoadTimeout, segment),
segment
);
Assert.assertEquals(1L, statsAfterLoadPrimary.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
EasyMock.verify(throttler, emptyPeon, mockBalancerStrategy);
}
@Test
public void testSkipReplicationForTimedOutSegments()
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();
final LoadQueuePeon emptyPeon = createEmptyPeon();
emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
LoadRule rule = createLoadRule(ImmutableMap.of(
"hot", 1
));
final DataSegment segment = createDataSegment("foo");
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.anyTimes();
EasyMock.replay(throttler, emptyPeon, mockBalancerStrategy);
ImmutableDruidServer server1 =
new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "hot", 1).toImmutableDruidServer();
ImmutableDruidServer server2 =
new DruidServer("serverHot2", "hostHot2", null, 1000, ServerType.HISTORICAL, "hot", 1).toImmutableDruidServer();
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier("hot", new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon))
.build();
CoordinatorStats stats = rule.run(
null,
makeCoordinatorRuntimeParams(druidCluster, segment),
segment
);
// Ensure that the segment is assigned to one of the historicals
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
// Add the segment to the timed out list to simulate peon timeout on loading the segment
final LoadQueuePeon slowLoadingPeon = createLoadingPeon(ImmutableList.of(segment), true);
EasyMock.replay(slowLoadingPeon);
DruidCluster withLoadTimeout = DruidClusterBuilder
.newBuilder()
.addTier("hot", new ServerHolder(server1, slowLoadingPeon), new ServerHolder(server2, emptyPeon))
.build();
// Default behavior is to not replicate the timed out segments on other servers
CoordinatorStats statsAfterLoadPrimary = rule.run(
null,
makeCoordinatorRuntimeParams(withLoadTimeout, segment),
segment
);
Assert.assertEquals(0L, statsAfterLoadPrimary.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
EasyMock.verify(throttler, emptyPeon, mockBalancerStrategy);
}
@Test
public void testLoadPriority()
{
@ -390,7 +524,7 @@ public class LoadRuleTest
CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster(), false))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withUsedSegmentsInTest(segment)
@ -471,7 +605,7 @@ public class LoadRuleTest
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3)
@ -723,7 +857,7 @@ public class LoadRuleTest
return mockPeon;
}
private static LoadQueuePeon createLoadingPeon(List<DataSegment> segments)
private static LoadQueuePeon createLoadingPeon(List<DataSegment> segments, boolean slowLoading)
{
final Set<DataSegment> segs = ImmutableSet.copyOf(segments);
final long loadingSize = segs.stream().mapToLong(DataSegment::getSize).sum();
@ -734,6 +868,12 @@ public class LoadRuleTest
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(loadingSize).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(segs.size()).anyTimes();
if (slowLoading) {
EasyMock.expect(mockPeon.getTimedOutSegments()).andReturn(new HashSet<>(segments)).anyTimes();
} else {
EasyMock.expect(mockPeon.getTimedOutSegments()).andReturn(new HashSet<>()).anyTimes();
}
return mockPeon;
}

View File

@ -53,7 +53,8 @@ public class CoordinatorDynamicConfigTest
+ " \"maxSegmentsInNodeLoadingQueue\": 1,\n"
+ " \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
+ " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n"
+ " \"pauseCoordination\": false\n"
+ " \"pauseCoordination\": false,\n"
+ " \"replicateAfterLoadTimeout\": false\n"
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
@ -67,19 +68,23 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false);
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, false);
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false);
actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false);
actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual);
assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true);
assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false);
actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual);
assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true);
}
@Test
@ -109,13 +114,13 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of();
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false);
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false);
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false);
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false);
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false);
}
@Test
@ -160,6 +165,7 @@ public class CoordinatorDynamicConfigTest
1,
ImmutableSet.of(),
0,
false,
false
);
}
@ -260,7 +266,7 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false);
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false);
}
@Test
@ -290,7 +296,7 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class
);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false);
//ensure whitelist is empty when killAllDataSources is true
try {
@ -337,7 +343,7 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class
);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false);
}
@Test
@ -361,6 +367,7 @@ public class CoordinatorDynamicConfigTest
0,
emptyList,
70,
false,
false
);
}
@ -376,7 +383,7 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals(
current,
new CoordinatorDynamicConfig
.Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)
.Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)
.build(current)
);
}
@ -406,7 +413,8 @@ public class CoordinatorDynamicConfigTest
int expectedMaxSegmentsInNodeLoadingQueue,
Set<String> decommissioningNodes,
int decommissioningMaxPercentOfMaxSegmentsToMove,
boolean pauseCoordination
boolean pauseCoordination,
boolean replicateAfterLoadTimeout
)
{
Assert.assertEquals(
@ -433,5 +441,6 @@ public class CoordinatorDynamicConfigTest
config.getDecommissioningMaxPercentOfMaxSegmentsToMove()
);
Assert.assertEquals(pauseCoordination, config.getPauseCoordination());
Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout());
}
}