Add a builder class for TestDruidCoordinatorConfig (#12624)

* Add a builder class for TestDruidCoordinatorConfig

* updates after review

* Fix formatting
This commit is contained in:
Lucas Capistrant 2022-06-16 09:11:31 -05:00 committed by GitHub
parent 94564b6ce6
commit 602d95d865
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 549 additions and 813 deletions

View File

@ -165,27 +165,14 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
curator.create().creatingParentsIfNeeded().forPath(DESTINATION_LOAD_PATH); curator.create().creatingParentsIfNeeded().forPath(DESTINATION_LOAD_PATH);
objectMapper = new DefaultObjectMapper(); objectMapper = new DefaultObjectMapper();
druidCoordinatorConfig = new TestDruidCoordinatorConfig( druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
new Duration(COORDINATOR_START_DELAY), .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
new Duration(COORDINATOR_PERIOD), .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
null, .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
null, .withCoordinatorKillMaxSegments(10)
null, .withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
new Duration(COORDINATOR_PERIOD), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
null,
null,
null,
10,
new Duration("PT0s"),
false
);
sourceLoadQueueChildrenCache = new PathChildrenCache( sourceLoadQueueChildrenCache = new PathChildrenCache(
curator, curator,
SOURCE_LOAD_PATH, SOURCE_LOAD_PATH,

View File

@ -145,27 +145,13 @@ public class DruidCoordinatorTest extends CuratorTestBase
curator.blockUntilConnected(); curator.blockUntilConnected();
curator.create().creatingParentsIfNeeded().forPath(LOADPATH); curator.create().creatingParentsIfNeeded().forPath(LOADPATH);
objectMapper = new DefaultObjectMapper(); objectMapper = new DefaultObjectMapper();
druidCoordinatorConfig = new TestDruidCoordinatorConfig( druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
new Duration(COORDINATOR_START_DELAY), .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
new Duration(COORDINATOR_PERIOD), .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
null, .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
null, .withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
null, .withCoordinatorKillIgnoreDurationToRetain(false)
new Duration(COORDINATOR_PERIOD), .build();
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
10,
new Duration("PT0s"),
false
);
pathChildrenCache = new PathChildrenCache( pathChildrenCache = new PathChildrenCache(
curator, curator,
LOADPATH, LOADPATH,
@ -925,28 +911,15 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test @Test
public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments() public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments()
{ {
DruidCoordinatorConfig differentConfigUsedInCustomGroup = new TestDruidCoordinatorConfig( DruidCoordinatorConfig differentConfigUsedInCustomGroup = new TestDruidCoordinatorConfig.Builder()
new Duration(COORDINATOR_START_DELAY), .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
new Duration(COORDINATOR_PERIOD), .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
null, .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
null, .withCoordinatorKillMaxSegments(10)
null, .withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
new Duration(COORDINATOR_PERIOD), .withCompactionSkippedLockedIntervals(false)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
null,
null,
10,
new Duration("PT0s"),
false,
false
);
CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null, null))); CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null, null)));
CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup)); CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup));
coordinator = new DruidCoordinator( coordinator = new DruidCoordinator(

View File

@ -73,34 +73,12 @@ public class HttpLoadQueuePeonTest
null, null, null, null, 0, 0 null, null, null, null, 0, 0
); );
final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig( final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig.Builder()
null, .withCoordinatorKillMaxSegments(10)
null, .withLoadQueuePeonRepeatDelay(Duration.ZERO)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .withHttpLoadQueuePeonBatchSize(2)
null, .build();
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
10,
Duration.ZERO,
false
)
{
@Override
public int getHttpLoadQueuePeonBatchSize()
{
return 2;
}
};
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testSimple() throws Exception public void testSimple() throws Exception

View File

@ -88,27 +88,11 @@ public class LoadQueuePeonTest extends CuratorTestBase
jsonMapper, jsonMapper,
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"), Execs.singleThreaded("test_load_queue_peon-%d"),
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withCoordinatorKillMaxSegments(10)
null, .withLoadQueuePeonRepeatDelay(Duration.millis(0))
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build()
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
10,
Duration.millis(0),
false
)
); );
loadQueuePeon.start(); loadQueuePeon.start();
@ -294,27 +278,12 @@ public class LoadQueuePeonTest extends CuratorTestBase
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"), Execs.singleThreaded("test_load_queue_peon-%d"),
// set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly // set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withLoadTimeoutDelay(new Duration(1))
null, .withCoordinatorKillMaxSegments(10)
null, .withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
null, .withCoordinatorKillIgnoreDurationToRetain(false)
new Duration(1), .build()
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
10,
new Duration("PT1s"),
false
)
); );
loadQueuePeon.start(); loadQueuePeon.start();
@ -357,27 +326,12 @@ public class LoadQueuePeonTest extends CuratorTestBase
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"), Execs.singleThreaded("test_load_queue_peon-%d"),
// set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly // set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withLoadTimeoutDelay(new Duration(1))
null, .withCoordinatorKillMaxSegments(10)
null, .withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
null, .withCoordinatorKillIgnoreDurationToRetain(false)
new Duration(1), .build()
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
10,
new Duration("PT1s"),
false
)
); );
loadQueuePeon.start(); loadQueuePeon.start();

View File

@ -37,27 +37,12 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
null, null,
Execs.scheduledSingleThreaded("LoadQueuePeonTester--%d"), Execs.scheduledSingleThreaded("LoadQueuePeonTester--%d"),
null, null,
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withLoadTimeoutDelay(new Duration(1))
null, .withCoordinatorKillMaxSegments(10)
null, .withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
null, .withCoordinatorKillIgnoreDurationToRetain(false)
new Duration(1), .build()
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
10,
new Duration("PT1s"),
false
)
); );
} }

View File

@ -39,10 +39,15 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private final Duration coordinatorRuleKillDurationToRetain; private final Duration coordinatorRuleKillDurationToRetain;
private final Duration coordinatorDatasourceKillPeriod; private final Duration coordinatorDatasourceKillPeriod;
private final Duration coordinatorDatasourceKillDurationToRetain; private final Duration coordinatorDatasourceKillDurationToRetain;
private final Duration getLoadQueuePeonRepeatDelay; private final Duration loadQueuePeonRepeatDelay;
private final int coordinatorKillMaxSegments; private final int coordinatorKillMaxSegments;
private final boolean compactionSkipLockedIntervals; private final boolean compactionSkipLockedIntervals;
private final boolean coordinatorKillIgnoreDurationToRetain; private final boolean coordinatorKillIgnoreDurationToRetain;
private final String loadQueuePeonType;
private final Duration httpLoadQueuePeonRepeatDelay;
private final int curatorLoadQueuePeonNumCallbackThreads;
private final Duration httpLoadQueuePeonHostTimeout;
private final int httpLoadQueuePeonBatchSize;
public TestDruidCoordinatorConfig( public TestDruidCoordinatorConfig(
Duration coordinatorStartDelay, Duration coordinatorStartDelay,
@ -62,53 +67,14 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
Duration coordinatorDatasourceKillPeriod, Duration coordinatorDatasourceKillPeriod,
Duration coordinatorDatasourceKillDurationToRetain, Duration coordinatorDatasourceKillDurationToRetain,
int coordinatorKillMaxSegments, int coordinatorKillMaxSegments,
Duration getLoadQueuePeonRepeatDelay, Duration loadQueuePeonRepeatDelay,
boolean coordinatorKillIgnoreDurationToRetain
)
{
this.coordinatorStartDelay = coordinatorStartDelay;
this.coordinatorPeriod = coordinatorPeriod;
this.coordinatorIndexingPeriod = coordinatorIndexingPeriod;
this.metadataStoreManagementPeriod = metadataStoreManagementPeriod;
this.loadTimeoutDelay = loadTimeoutDelay;
this.coordinatorKillPeriod = coordinatorKillPeriod;
this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain;
this.coordinatorSupervisorKillPeriod = coordinatorSupervisorKillPeriod;
this.coordinatorSupervisorKillDurationToRetain = coordinatorSupervisorKillDurationToRetain;
this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain;
this.coordinatorCompactionKillPeriod = coordinatorCompactionKillPeriod;
this.coordinatorRuleKillPeriod = coordinatorRuleKillPeriod;
this.coordinatorRuleKillDurationToRetain = coordinatorRuleKillDurationToRetain;
this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
this.compactionSkipLockedIntervals = true;
this.coordinatorKillIgnoreDurationToRetain = coordinatorKillIgnoreDurationToRetain;
}
public TestDruidCoordinatorConfig(
Duration coordinatorStartDelay,
Duration coordinatorPeriod,
Duration coordinatorIndexingPeriod,
Duration metadataStoreManagementPeriod,
Duration loadTimeoutDelay,
Duration coordinatorKillPeriod,
Duration coordinatorKillDurationToRetain,
Duration coordinatorSupervisorKillPeriod,
Duration coordinatorSupervisorKillDurationToRetain,
Duration coordinatorAuditKillPeriod,
Duration coordinatorAuditKillDurationToRetain,
Duration coordinatorCompactionKillPeriod,
Duration coordinatorRuleKillPeriod,
Duration coordinatorRuleKillDurationToRetain,
Duration coordinatorDatasourceKillPeriod,
Duration coordinatorDatasourceKillDurationToRetain,
int coordinatorKillMaxSegments,
Duration getLoadQueuePeonRepeatDelay,
boolean compactionSkipLockedIntervals, boolean compactionSkipLockedIntervals,
boolean coordinatorKillIgnoreDurationToRetain boolean coordinatorKillIgnoreDurationToRetain,
String loadQueuePeonType,
Duration httpLoadQueuePeonRepeatDelay,
Duration httpLoadQueuePeonHostTimeout,
int httpLoadQueuePeonBatchSize,
int curatorLoadQueuePeonNumCallbackThreads
) )
{ {
this.coordinatorStartDelay = coordinatorStartDelay; this.coordinatorStartDelay = coordinatorStartDelay;
@ -128,9 +94,14 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod; this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain; this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments; this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay; this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay;
this.compactionSkipLockedIntervals = compactionSkipLockedIntervals; this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
this.coordinatorKillIgnoreDurationToRetain = coordinatorKillIgnoreDurationToRetain; this.coordinatorKillIgnoreDurationToRetain = coordinatorKillIgnoreDurationToRetain;
this.loadQueuePeonType = loadQueuePeonType;
this.httpLoadQueuePeonRepeatDelay = httpLoadQueuePeonRepeatDelay;
this.httpLoadQueuePeonHostTimeout = httpLoadQueuePeonHostTimeout;
this.httpLoadQueuePeonBatchSize = httpLoadQueuePeonBatchSize;
this.curatorLoadQueuePeonNumCallbackThreads = curatorLoadQueuePeonNumCallbackThreads;
} }
@Override @Override
@ -238,7 +209,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
@Override @Override
public Duration getLoadQueuePeonRepeatDelay() public Duration getLoadQueuePeonRepeatDelay()
{ {
return getLoadQueuePeonRepeatDelay; return loadQueuePeonRepeatDelay;
} }
@Override @Override
@ -252,4 +223,277 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
{ {
return coordinatorKillIgnoreDurationToRetain; return coordinatorKillIgnoreDurationToRetain;
} }
@Override
public String getLoadQueuePeonType()
{
return loadQueuePeonType;
}
@Override
public Duration getHttpLoadQueuePeonRepeatDelay()
{
return httpLoadQueuePeonRepeatDelay;
}
@Override
public int getNumCuratorCallBackThreads()
{
return curatorLoadQueuePeonNumCallbackThreads;
}
@Override
public Duration getHttpLoadQueuePeonHostTimeout()
{
return httpLoadQueuePeonHostTimeout;
}
@Override
public int getHttpLoadQueuePeonBatchSize()
{
return httpLoadQueuePeonBatchSize;
}
public static class Builder
{
private static final Duration DEFAULT_COORDINATOR_START_DELAY = new Duration("PT300s");
private static final Duration DEFAULT_COORDINATOR_PERIOD = new Duration("PT60s");
private static final Duration DEFAULT_COORDINATOR_INDEXING_PERIOD = new Duration("PT1800s");
private static final Duration DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD = new Duration("PT3600s");
private static final Duration DEFAULT_COORDINATOR_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAION = new Duration("PT7776000s");
private static final boolean DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN = false;
private static final int DEFAULT_COORDINATOR_KILL_MAX_SEGMENTS = 100;
private static final Duration DEFAULT_COORDINATOR_SUPERVISOR_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_SUPERVISOR_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
private static final Duration DEFAULT_COORDINATOR_COMPACTION_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_RULE_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_RULE_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
private static final Duration DEFAULT_LOAD_TIMEOUT_DELAY = new Duration(15 * 60 * 1000);
private static final Duration DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY = Duration.millis(50);
private static final String DEFAULT_LOAD_QUEUE_PEON_TYPE = "curator";
private static final int DEFAULT_CURATOR_LOAD_QUEUE_PEON_NUM_CALLBACK_THREADS = 2;
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY = Duration.millis(60000);
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_HOST_TIMEOUT = Duration.millis(300000);
private static final int DEFAULT_HTTP_LOAD_QUEUE_PEON_BATCH_SIZE = 1;
private static final boolean DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS = true;
private static final Duration DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_AUTIT_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
private Duration coordinatorStartDelay;
private Duration coordinatorPeriod;
private Duration coordinatorIndexingPeriod;
private Duration metadataStoreManagementPeriod;
private Duration coordinatorKillPeriod;
private Duration coordinatorKillDurationToRetain;
private Boolean coordinatorKillIgnoreDurationToRetain;
private Integer coordinatorKillMaxSegments;
private Duration coordinatorSupervisorKillPeriod;
private Duration coordinatorSupervisorKillDurationToRetain;
private Duration coordinatorCompactionKillPeriod;
private Duration coordinatorRuleKillPeriod;
private Duration coordinatorRuleKillDurationToRetain;
private Duration coordinatorDatasourceKillPeriod;
private Duration coordinatorDatasourceKillDurationToRetain;
private Duration loadTimeoutDelay;
private Duration loadQueuePeonRepeatDelay;
private String loadQueuePeonType;
private Duration httpLoadQueuePeonRepeatDelay;
private Integer curatorLoadQueuePeonNumCallbackThreads;
private Duration httpLoadQueuePeonHostTimeout;
private Integer httpLoadQueuePeonBatchSize;
private Boolean compactionSkippedLockedIntervals;
private Duration coordinatorAuditKillPeriod;
private Duration coordinatorAuditKillDurationToRetain;
public Builder()
{
}
public Builder withCoordinatorStartDelay(Duration coordinatorStartDelay)
{
this.coordinatorStartDelay = coordinatorStartDelay;
return this;
}
public Builder withCoordinatorPeriod(Duration coordinatorPeriod)
{
this.coordinatorPeriod = coordinatorPeriod;
return this;
}
public Builder withCoordinatorIndexingPeriod(Duration coordinatorIndexingPeriod)
{
this.coordinatorIndexingPeriod = coordinatorIndexingPeriod;
return this;
}
public Builder withMetadataStoreManagementPeriod(Duration metadataStoreManagementPeriod)
{
this.metadataStoreManagementPeriod = metadataStoreManagementPeriod;
return this;
}
public Builder withCoordinatorKillPeriod(Duration coordinatorKillPeriod)
{
this.coordinatorKillPeriod = coordinatorKillPeriod;
return this;
}
public Builder withCoordinatorKillDurationToRetain(Duration coordinatorKillDurationToRetain)
{
this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain;
return this;
}
public Builder withCoordinatorKillIgnoreDurationToRetain(boolean coordinatorKillIgnoreDurationToRetain)
{
this.coordinatorKillIgnoreDurationToRetain = coordinatorKillIgnoreDurationToRetain;
return this;
}
public Builder withCoordinatorKillMaxSegments(int coordinatorKillMaxSegments)
{
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
return this;
}
public Builder withCoordinatorSupervisorKillPeriod(Duration coordinatorSupervisorKillPeriod)
{
this.coordinatorSupervisorKillPeriod = coordinatorSupervisorKillPeriod;
return this;
}
public Builder withCoordinatorSupervisorKillDurationToRetain(Duration coordinatorSupervisorKillDurationToRetain)
{
this.coordinatorSupervisorKillDurationToRetain = coordinatorSupervisorKillDurationToRetain;
return this;
}
public Builder withCoordinatorCompactionKillPeriod(Duration coordinatorCompactionKillPeriod)
{
this.coordinatorCompactionKillPeriod = coordinatorCompactionKillPeriod;
return this;
}
public Builder withCoordinatorRuleKillPeriod(Duration coordinatorRuleKillPeriod)
{
this.coordinatorRuleKillPeriod = coordinatorRuleKillPeriod;
return this;
}
public Builder withCoordinatorRuleKillDurationToRetain(Duration coordinatorRuleKillDurationToRetain)
{
this.coordinatorRuleKillDurationToRetain = coordinatorRuleKillDurationToRetain;
return this;
}
public Builder withCoordinatorDatasourceKillPeriod(Duration coordinatorDatasourceKillPeriod)
{
this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
return this;
}
public Builder withCoordinatorDatasourceKillDurationToRetain(Duration coordinatorDatasourceKillDurationToRetain)
{
this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
return this;
}
public Builder withLoadTimeoutDelay(Duration loadTimeoutDelay)
{
this.loadTimeoutDelay = loadTimeoutDelay;
return this;
}
public Builder withLoadQueuePeonRepeatDelay(Duration loadQueuePeonRepeatDelay)
{
this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay;
return this;
}
public Builder withLoadQueuePeonType(String loadQueuePeonType)
{
this.loadQueuePeonType = loadQueuePeonType;
return this;
}
public Builder withHttpLoadQueuePeonRepeatDelay(Duration httpLoadQueuePeonRepeatDelay)
{
this.httpLoadQueuePeonRepeatDelay = httpLoadQueuePeonRepeatDelay;
return this;
}
public Builder withCuratorLoadQueuePeonNumCallbackThreads(int curatorLoadQueuePeonNumCallbackThreads)
{
this.curatorLoadQueuePeonNumCallbackThreads = curatorLoadQueuePeonNumCallbackThreads;
return this;
}
public Builder withHttpLoadQueuePeonHostTimeout(Duration httpLoadQueuePeonHostTimeout)
{
this.httpLoadQueuePeonHostTimeout = httpLoadQueuePeonHostTimeout;
return this;
}
public Builder withHttpLoadQueuePeonBatchSize(int httpLoadQueuePeonBatchSize)
{
this.httpLoadQueuePeonBatchSize = httpLoadQueuePeonBatchSize;
return this;
}
public Builder withCompactionSkippedLockedIntervals(boolean compactionSkippedLockedIntervals)
{
this.compactionSkippedLockedIntervals = compactionSkippedLockedIntervals;
return this;
}
public Builder withCoordianatorAuditKillPeriod(Duration coordinatorAuditKillPeriod)
{
this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
return this;
}
public Builder withCoordinatorAuditKillDurationToRetain(Duration coordinatorAuditKillDurationToRetain)
{
this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain;
return this;
}
public TestDruidCoordinatorConfig build()
{
return new TestDruidCoordinatorConfig(
coordinatorStartDelay == null ? DEFAULT_COORDINATOR_START_DELAY : coordinatorStartDelay,
coordinatorPeriod == null ? DEFAULT_COORDINATOR_PERIOD : coordinatorPeriod,
coordinatorIndexingPeriod == null ? DEFAULT_COORDINATOR_INDEXING_PERIOD : coordinatorIndexingPeriod,
metadataStoreManagementPeriod == null ? DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD : metadataStoreManagementPeriod,
loadTimeoutDelay == null ? DEFAULT_LOAD_TIMEOUT_DELAY : loadTimeoutDelay,
coordinatorKillPeriod == null ? DEFAULT_COORDINATOR_KILL_PERIOD : coordinatorKillPeriod,
coordinatorKillDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAION : coordinatorKillDurationToRetain,
coordinatorSupervisorKillPeriod == null ? DEFAULT_COORDINATOR_SUPERVISOR_KILL_PERIOD : coordinatorSupervisorKillPeriod,
coordinatorSupervisorKillDurationToRetain == null ? DEFAULT_COORDINATOR_SUPERVISOR_KILL_DURATION_TO_RETAIN : coordinatorSupervisorKillDurationToRetain,
coordinatorAuditKillPeriod == null ? DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD : coordinatorAuditKillPeriod,
coordinatorAuditKillDurationToRetain == null ? DEFAULT_COORDINATOR_AUTIT_KILL_DURATION_TO_RETAIN : coordinatorAuditKillDurationToRetain,
coordinatorCompactionKillPeriod == null ? DEFAULT_COORDINATOR_COMPACTION_KILL_PERIOD : coordinatorCompactionKillPeriod,
coordinatorRuleKillPeriod == null ? DEFAULT_COORDINATOR_RULE_KILL_PERIOD : coordinatorRuleKillPeriod,
coordinatorRuleKillDurationToRetain == null ? DEFAULT_COORDINATOR_RULE_KILL_DURATION_TO_RETAIN : coordinatorRuleKillDurationToRetain,
coordinatorDatasourceKillPeriod == null ? DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD : coordinatorDatasourceKillPeriod,
coordinatorDatasourceKillDurationToRetain == null ? DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN : coordinatorDatasourceKillDurationToRetain,
coordinatorKillMaxSegments == null ? DEFAULT_COORDINATOR_KILL_MAX_SEGMENTS : coordinatorKillMaxSegments,
loadQueuePeonRepeatDelay == null ? DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY : loadQueuePeonRepeatDelay,
compactionSkippedLockedIntervals == null ? DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS : compactionSkippedLockedIntervals,
coordinatorKillIgnoreDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN : coordinatorKillIgnoreDurationToRetain,
loadQueuePeonType == null ? DEFAULT_LOAD_QUEUE_PEON_TYPE : loadQueuePeonType,
httpLoadQueuePeonRepeatDelay == null ? DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY : httpLoadQueuePeonRepeatDelay,
httpLoadQueuePeonHostTimeout == null ? DEFAULT_HTTP_LOAD_QUEUE_PEON_HOST_TIMEOUT : httpLoadQueuePeonHostTimeout,
httpLoadQueuePeonBatchSize == null ? DEFAULT_HTTP_LOAD_QUEUE_PEON_BATCH_SIZE : httpLoadQueuePeonBatchSize,
curatorLoadQueuePeonNumCallbackThreads == null ? DEFAULT_CURATOR_LOAD_QUEUE_PEON_NUM_CALLBACK_THREADS
: curatorLoadQueuePeonNumCallbackThreads
);
}
}
} }

View File

@ -54,27 +54,13 @@ public class KillAuditLogTest
@Test @Test
public void testRunSkipIfLastRunLessThanPeriod() public void testRunSkipIfLastRunLessThanPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5s"))
null, .withCoordianatorAuditKillPeriod(new Duration(Long.MAX_VALUE))
null, .withCoordinatorAuditKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
new Duration(Long.MAX_VALUE),
new Duration("PT1S"),
null,
null,
null,
null,
null,
10,
null,
false
);
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
killAuditLog.run(mockDruidCoordinatorRuntimeParams); killAuditLog.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyNoInteractions(mockAuditManager); Mockito.verifyNoInteractions(mockAuditManager);
@ -84,27 +70,13 @@ public class KillAuditLogTest
public void testRunNotSkipIfLastRunMoreThanPeriod() public void testRunNotSkipIfLastRunMoreThanPeriod()
{ {
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5s"))
null, .withCoordianatorAuditKillPeriod(new Duration("PT6S"))
null, .withCoordinatorAuditKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
new Duration("PT6S"),
new Duration("PT1S"),
null,
null,
null,
null,
null,
10,
null,
false
);
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
killAuditLog.run(mockDruidCoordinatorRuntimeParams); killAuditLog.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong()); Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong());
@ -114,27 +86,13 @@ public class KillAuditLogTest
@Test @Test
public void testConstructorFailIfInvalidPeriod() public void testConstructorFailIfInvalidPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5s"))
null, .withCoordianatorAuditKillPeriod(new Duration("PT3S"))
null, .withCoordinatorAuditKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
new Duration("PT3S"),
new Duration("PT1S"),
null,
null,
null,
null,
null,
10,
null,
false
);
exception.expect(IllegalArgumentException.class); exception.expect(IllegalArgumentException.class);
exception.expectMessage("coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); exception.expectMessage("coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
@ -143,27 +101,13 @@ public class KillAuditLogTest
@Test @Test
public void testConstructorFailIfInvalidRetainDuration() public void testConstructorFailIfInvalidRetainDuration()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordianatorAuditKillPeriod(new Duration("PT6S"))
null, .withCoordinatorAuditKillDurationToRetain(new Duration("PT-1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
new Duration("PT6S"),
new Duration("PT-1S"),
null,
null,
null,
null,
null,
10,
null,
false
);
exception.expect(IllegalArgumentException.class); exception.expect(IllegalArgumentException.class);
exception.expectMessage("coordinator audit kill retainDuration must be >= 0"); exception.expectMessage("coordinator audit kill retainDuration must be >= 0");
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);

View File

@ -86,27 +86,12 @@ public class KillCompactionConfigTest
@Test @Test
public void testRunSkipIfLastRunLessThanPeriod() public void testRunSkipIfLastRunLessThanPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorCompactionKillPeriod(new Duration(Long.MAX_VALUE))
null, .withCoordinatorKillMaxSegments(10)
new Duration("PT5S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
new Duration(Long.MAX_VALUE),
null,
null,
null,
null,
10,
null,
false
);
killCompactionConfig = new KillCompactionConfig( killCompactionConfig = new KillCompactionConfig(
druidCoordinatorConfig, druidCoordinatorConfig,
mockSqlSegmentsMetadataManager, mockSqlSegmentsMetadataManager,
@ -123,27 +108,12 @@ public class KillCompactionConfigTest
@Test @Test
public void testConstructorFailIfInvalidPeriod() public void testConstructorFailIfInvalidPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorCompactionKillPeriod(new Duration("PT3S"))
null, .withCoordinatorKillMaxSegments(10)
new Duration("PT5S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
new Duration("PT3S"),
null,
null,
null,
null,
10,
null,
false
);
exception.expect(IllegalArgumentException.class); exception.expect(IllegalArgumentException.class);
exception.expectMessage("Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); exception.expectMessage("Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
killCompactionConfig = new KillCompactionConfig( killCompactionConfig = new KillCompactionConfig(
@ -173,27 +143,12 @@ public class KillCompactionConfigTest
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(CoordinatorCompactionConfig.empty()); ).thenReturn(CoordinatorCompactionConfig.empty());
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorCompactionKillPeriod(new Duration("PT6S"))
null, .withCoordinatorKillMaxSegments(10)
new Duration("PT5S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
new Duration("PT6S"),
null,
null,
null,
null,
10,
null,
false
);
killCompactionConfig = new KillCompactionConfig( killCompactionConfig = new KillCompactionConfig(
druidCoordinatorConfig, druidCoordinatorConfig,
mockSqlSegmentsMetadataManager, mockSqlSegmentsMetadataManager,
@ -279,27 +234,12 @@ public class KillCompactionConfigTest
ArgumentMatchers.any()) ArgumentMatchers.any())
).thenReturn(ConfigManager.SetResult.ok()); ).thenReturn(ConfigManager.SetResult.ok());
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorCompactionKillPeriod(new Duration("PT6S"))
null, .withCoordinatorKillMaxSegments(10)
new Duration("PT5S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
new Duration("PT6S"),
null,
null,
null,
null,
10,
null,
false
);
killCompactionConfig = new KillCompactionConfig( killCompactionConfig = new KillCompactionConfig(
druidCoordinatorConfig, druidCoordinatorConfig,
mockSqlSegmentsMetadataManager, mockSqlSegmentsMetadataManager,
@ -399,27 +339,12 @@ public class KillCompactionConfigTest
} }
}); });
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorCompactionKillPeriod(new Duration("PT6S"))
null, .withCoordinatorKillMaxSegments(10)
new Duration("PT5S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
new Duration("PT6S"),
null,
null,
null,
null,
10,
null,
false
);
killCompactionConfig = new KillCompactionConfig( killCompactionConfig = new KillCompactionConfig(
druidCoordinatorConfig, druidCoordinatorConfig,
mockSqlSegmentsMetadataManager, mockSqlSegmentsMetadataManager,

View File

@ -63,27 +63,13 @@ public class KillDatasourceMetadataTest
@Test @Test
public void testRunSkipIfLastRunLessThanPeriod() public void testRunSkipIfLastRunLessThanPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorDatasourceKillPeriod(new Duration(Long.MAX_VALUE))
null, .withCoordinatorDatasourceKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
null,
null,
new Duration(Long.MAX_VALUE),
new Duration("PT1S"),
10,
null,
false
);
killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyNoInteractions(mockIndexerMetadataStorageCoordinator); Mockito.verifyNoInteractions(mockIndexerMetadataStorageCoordinator);
@ -95,27 +81,13 @@ public class KillDatasourceMetadataTest
{ {
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorDatasourceKillPeriod(new Duration("PT6S"))
null, .withCoordinatorDatasourceKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
null,
null,
new Duration("PT6S"),
new Duration("PT1S"),
10,
null,
false
);
killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.anySet()); Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.anySet());
@ -125,27 +97,13 @@ public class KillDatasourceMetadataTest
@Test @Test
public void testConstructorFailIfInvalidPeriod() public void testConstructorFailIfInvalidPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorDatasourceKillPeriod(new Duration("PT3S"))
null, .withCoordinatorDatasourceKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
null,
null,
new Duration("PT3S"),
new Duration("PT1S"),
10,
null,
false
);
exception.expect(IllegalArgumentException.class); exception.expect(IllegalArgumentException.class);
exception.expectMessage("Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); exception.expectMessage("Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
@ -154,27 +112,13 @@ public class KillDatasourceMetadataTest
@Test @Test
public void testConstructorFailIfInvalidRetainDuration() public void testConstructorFailIfInvalidRetainDuration()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorDatasourceKillPeriod(new Duration("PT6S"))
null, .withCoordinatorDatasourceKillDurationToRetain(new Duration("PT-1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
null,
null,
new Duration("PT6S"),
new Duration("PT-1S"),
10,
null,
false
);
exception.expect(IllegalArgumentException.class); exception.expect(IllegalArgumentException.class);
exception.expectMessage("Coordinator datasource metadata kill retainDuration must be >= 0"); exception.expectMessage("Coordinator datasource metadata kill retainDuration must be >= 0");
killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
@ -185,27 +129,13 @@ public class KillDatasourceMetadataTest
{ {
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorDatasourceKillPeriod(new Duration("PT6S"))
null, .withCoordinatorDatasourceKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
null,
null,
new Duration("PT6S"),
new Duration("PT1S"),
10,
null,
false
);
killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.eq(ImmutableSet.of())); Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.eq(ImmutableSet.of()));

View File

@ -61,27 +61,13 @@ public class KillRulesTest
@Test @Test
public void testRunSkipIfLastRunLessThanPeriod() public void testRunSkipIfLastRunLessThanPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorRuleKillPeriod(new Duration(Long.MAX_VALUE))
null, .withCoordinatorRuleKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
new Duration(Long.MAX_VALUE),
new Duration("PT1S"),
null,
null,
10,
null,
false
);
killRules = new KillRules(druidCoordinatorConfig); killRules = new KillRules(druidCoordinatorConfig);
killRules.run(mockDruidCoordinatorRuntimeParams); killRules.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyNoInteractions(mockRuleManager); Mockito.verifyNoInteractions(mockRuleManager);
@ -91,27 +77,13 @@ public class KillRulesTest
public void testRunNotSkipIfLastRunMoreThanPeriod() public void testRunNotSkipIfLastRunMoreThanPeriod()
{ {
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorRuleKillPeriod(new Duration("PT6S"))
null, .withCoordinatorRuleKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
new Duration("PT6S"),
new Duration("PT1S"),
null,
null,
10,
null,
false
);
killRules = new KillRules(druidCoordinatorConfig); killRules = new KillRules(druidCoordinatorConfig);
killRules.run(mockDruidCoordinatorRuntimeParams); killRules.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockRuleManager).removeRulesForEmptyDatasourcesOlderThan(ArgumentMatchers.anyLong()); Mockito.verify(mockRuleManager).removeRulesForEmptyDatasourcesOlderThan(ArgumentMatchers.anyLong());
@ -121,27 +93,13 @@ public class KillRulesTest
@Test @Test
public void testConstructorFailIfInvalidPeriod() public void testConstructorFailIfInvalidPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorRuleKillPeriod(new Duration("PT3S"))
null, .withCoordinatorRuleKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
new Duration("PT3S"),
new Duration("PT1S"),
null,
null,
10,
null,
false
);
exception.expect(IllegalArgumentException.class); exception.expect(IllegalArgumentException.class);
exception.expectMessage("coordinator rule kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); exception.expectMessage("coordinator rule kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
killRules = new KillRules(druidCoordinatorConfig); killRules = new KillRules(druidCoordinatorConfig);
@ -150,27 +108,13 @@ public class KillRulesTest
@Test @Test
public void testConstructorFailIfInvalidRetainDuration() public void testConstructorFailIfInvalidRetainDuration()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorRuleKillPeriod(new Duration("PT6S"))
null, .withCoordinatorRuleKillDurationToRetain(new Duration("PT-1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
null,
null,
null,
null,
null,
new Duration("PT6S"),
new Duration("PT-1S"),
null,
null,
10,
null,
false
);
exception.expect(IllegalArgumentException.class); exception.expect(IllegalArgumentException.class);
exception.expectMessage("coordinator rule kill retainDuration must be >= 0"); exception.expectMessage("coordinator rule kill retainDuration must be >= 0");
killRules = new KillRules(druidCoordinatorConfig); killRules = new KillRules(druidCoordinatorConfig);

View File

@ -54,27 +54,13 @@ public class KillSupervisorsTest
@Test @Test
public void testRunSkipIfLastRunLessThanPeriod() public void testRunSkipIfLastRunLessThanPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorSupervisorKillPeriod(new Duration(Long.MAX_VALUE))
null, .withCoordinatorSupervisorKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
new Duration(Long.MAX_VALUE),
new Duration("PT1S"),
null,
null,
null,
null,
null,
null,
null,
10,
null,
false
);
killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager); killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager);
killSupervisors.run(mockDruidCoordinatorRuntimeParams); killSupervisors.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyNoInteractions(mockMetadataSupervisorManager); Mockito.verifyNoInteractions(mockMetadataSupervisorManager);
@ -84,27 +70,13 @@ public class KillSupervisorsTest
public void testRunNotSkipIfLastRunMoreThanPeriod() public void testRunNotSkipIfLastRunMoreThanPeriod()
{ {
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorSupervisorKillPeriod(new Duration("PT6S"))
null, .withCoordinatorSupervisorKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
new Duration("PT6S"),
new Duration("PT1S"),
null,
null,
null,
null,
null,
null,
null,
10,
null,
false
);
killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager); killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager);
killSupervisors.run(mockDruidCoordinatorRuntimeParams); killSupervisors.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockMetadataSupervisorManager).removeTerminatedSupervisorsOlderThan(ArgumentMatchers.anyLong()); Mockito.verify(mockMetadataSupervisorManager).removeTerminatedSupervisorsOlderThan(ArgumentMatchers.anyLong());
@ -114,27 +86,13 @@ public class KillSupervisorsTest
@Test @Test
public void testConstructorFailIfInvalidPeriod() public void testConstructorFailIfInvalidPeriod()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorSupervisorKillPeriod(new Duration("PT3S"))
null, .withCoordinatorSupervisorKillDurationToRetain(new Duration("PT1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
new Duration("PT3S"),
new Duration("PT1S"),
null,
null,
null,
null,
null,
null,
null,
10,
null,
false
);
exception.expect(IllegalArgumentException.class); exception.expect(IllegalArgumentException.class);
exception.expectMessage("Coordinator supervisor kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); exception.expectMessage("Coordinator supervisor kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager); killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager);
@ -143,27 +101,13 @@ public class KillSupervisorsTest
@Test @Test
public void testConstructorFailIfInvalidRetainDuration() public void testConstructorFailIfInvalidRetainDuration()
{ {
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
null, .withMetadataStoreManagementPeriod(new Duration("PT5S"))
null, .withCoordinatorSupervisorKillPeriod(new Duration("PT6S"))
null, .withCoordinatorSupervisorKillDurationToRetain(new Duration("PT-1S"))
new Duration("PT5S"), .withCoordinatorKillMaxSegments(10)
null, .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build();
null,
new Duration("PT6S"),
new Duration("PT-1S"),
null,
null,
null,
null,
null,
null,
null,
10,
null,
false
);
exception.expect(IllegalArgumentException.class); exception.expect(IllegalArgumentException.class);
exception.expectMessage("Coordinator supervisor kill retainDuration must be >= 0"); exception.expectMessage("Coordinator supervisor kill retainDuration must be >= 0");
killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager); killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager);

View File

@ -176,27 +176,15 @@ public class KillUnusedSegmentsTest
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments( KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
segmentsMetadataManager, segmentsMetadataManager,
indexingServiceClient, indexingServiceClient,
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
null, .withLoadTimeoutDelay(new Duration(1))
Duration.parse("PT76400S"), .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
null, .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
new Duration(1), .withCoordinatorKillMaxSegments(1000)
Duration.parse("PT86400S"), .withLoadQueuePeonRepeatDelay(Duration.ZERO)
Duration.parse("PT86400S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build()
null,
null,
null,
null,
null,
null,
null,
null,
1000,
Duration.ZERO,
false
)
); );
Assert.assertEquals( Assert.assertEquals(
@ -217,27 +205,15 @@ public class KillUnusedSegmentsTest
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments( KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
null, null,
null, null,
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
null, .withLoadTimeoutDelay(new Duration(1))
Duration.parse("PT76400S"), .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
null, .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
new Duration(1), .withCoordinatorKillMaxSegments(1000)
Duration.parse("PT86400S"), .withLoadQueuePeonRepeatDelay(Duration.ZERO)
Duration.parse("PT86400S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build()
null,
null,
null,
null,
null,
null,
null,
null,
1000,
Duration.ZERO,
false
)
); );
Assert.assertEquals((Long) Duration.parse("PT86400S").getMillis(), unusedSegmentsKiller.getRetainDuration()); Assert.assertEquals((Long) Duration.parse("PT86400S").getMillis(), unusedSegmentsKiller.getRetainDuration());
@ -245,27 +221,15 @@ public class KillUnusedSegmentsTest
unusedSegmentsKiller = new KillUnusedSegments( unusedSegmentsKiller = new KillUnusedSegments(
null, null,
null, null,
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
null, .withLoadTimeoutDelay(new Duration(1))
Duration.parse("PT76400S"), .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
null, .withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S"))
new Duration(1), .withCoordinatorKillMaxSegments(1000)
Duration.parse("PT86400S"), .withLoadQueuePeonRepeatDelay(Duration.ZERO)
Duration.parse("PT-86400S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build()
null,
null,
null,
null,
null,
null,
null,
null,
1000,
Duration.ZERO,
false
)
); );
Assert.assertEquals((Long) Duration.parse("PT-86400S").getMillis(), unusedSegmentsKiller.getRetainDuration()); Assert.assertEquals((Long) Duration.parse("PT-86400S").getMillis(), unusedSegmentsKiller.getRetainDuration());
} }
@ -282,27 +246,15 @@ public class KillUnusedSegmentsTest
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments( KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
null, null,
null, null,
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
null, .withLoadTimeoutDelay(new Duration(1))
Duration.parse("PT76400S"), .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
null, .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
new Duration(1), .withCoordinatorKillMaxSegments(1000)
Duration.parse("PT86400S"), .withLoadQueuePeonRepeatDelay(Duration.ZERO)
Duration.parse("PT86400S"), .withCoordinatorKillIgnoreDurationToRetain(true)
null, .build()
null,
null,
null,
null,
null,
null,
null,
null,
1000,
Duration.ZERO,
true
)
); );
Assert.assertEquals( Assert.assertEquals(
DateTimes.COMPARE_DATE_AS_STRING_MAX, DateTimes.COMPARE_DATE_AS_STRING_MAX,
@ -313,27 +265,15 @@ public class KillUnusedSegmentsTest
unusedSegmentsKiller = new KillUnusedSegments( unusedSegmentsKiller = new KillUnusedSegments(
null, null,
null, null,
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
null, .withLoadTimeoutDelay(new Duration(1))
Duration.parse("PT76400S"), .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
null, .withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S"))
new Duration(1), .withCoordinatorKillMaxSegments(1000)
Duration.parse("PT86400S"), .withLoadQueuePeonRepeatDelay(Duration.ZERO)
Duration.parse("PT-86400S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build()
null,
null,
null,
null,
null,
null,
null,
null,
1000,
Duration.ZERO,
false
)
); );
DateTime expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT-86400S").getMillis()); DateTime expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT-86400S").getMillis());
@ -343,27 +283,15 @@ public class KillUnusedSegmentsTest
unusedSegmentsKiller = new KillUnusedSegments( unusedSegmentsKiller = new KillUnusedSegments(
null, null,
null, null,
new TestDruidCoordinatorConfig( new TestDruidCoordinatorConfig.Builder()
null, .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
null, .withLoadTimeoutDelay(new Duration(1))
Duration.parse("PT76400S"), .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
null, .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
new Duration(1), .withCoordinatorKillMaxSegments(1000)
Duration.parse("PT86400S"), .withLoadQueuePeonRepeatDelay(Duration.ZERO)
Duration.parse("PT86400S"), .withCoordinatorKillIgnoreDurationToRetain(false)
null, .build()
null,
null,
null,
null,
null,
null,
null,
null,
1000,
Duration.ZERO,
false
)
); );
expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT86400S").getMillis()); expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT86400S").getMillis());
Assert.assertEquals(expectedTime, unusedSegmentsKiller.getEndTimeUpperLimit()); Assert.assertEquals(expectedTime, unusedSegmentsKiller.getEndTimeUpperLimit());