Add feature to automatically remove audit logs based on retention period (#11084)

* add docs

* add impl

* fix checkstyle

* fix test

* add test

* fix checkstyle

* fix checkstyle

* fix test

* Address comments

* Address comments

* fix spelling

* fix docs
This commit is contained in:
Maytas Monsereenusorn 2021-04-20 17:10:43 -07:00 committed by GitHub
parent 09dcf6aa36
commit 6d2b5cdd7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 494 additions and 15 deletions

View File

@ -91,4 +91,12 @@ public interface AuditManager
* @return list of AuditEntries satisfying the passed parameters * @return list of AuditEntries satisfying the passed parameters
*/ */
List<AuditEntry> fetchAuditHistory(String type, int limit); List<AuditEntry> fetchAuditHistory(String type, int limit);
/**
* Remove audit logs created older than the given timestamp.
*
* @param timestamp timestamp in milliseconds
* @return number of audit logs removed
*/
int removeAuditLogsOlderThan(long timestamp);
} }

View File

@ -742,6 +742,15 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator process should act like an Overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone Overlord processes. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false| |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator process should act like an Overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone Overlord processes. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord processes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL| |`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord processes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
##### Metadata Management
|Property|Description|Required?|Default|
|--------|-----------|---------|-------|
|`druid.coordinator.period.metadataStoreManagementPeriod`|How often to run metadata management tasks in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. |No | `PT1H`|
|`druid.coordinator.kill.audit.on`| Boolean value for whether to enable automatic deletion of audit logs. If set to true, Coordinator will periodically remove audit logs from the audit table entries in metadata storage.| No | False|
|`druid.coordinator.kill.audit.period`| How often to do automatic deletion of audit logs in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.audit.on` is set to True.| No| `P1D`|
|`druid.coordinator.kill.audit.durationToRetain`| Duration of audit logs to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.audit.on` is set to True.| Yes if `druid.coordinator.kill.audit.on` is set to True| None|
##### Segment Management ##### Segment Management
|Property|Possible Values|Description|Default| |Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------| |--------|---------------|-----------|-------|

View File

@ -256,6 +256,8 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`interval/skipCompact/count`|Total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.| |`interval/skipCompact/count`|Total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
|`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The duty dimension is the string alias of the Duty that is being run.|duty.|Varies.| |`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The duty dimension is the string alias of the Duty that is being run.|duty.|Varies.|
|`coordinator/global/time`|Approximate runtime of a full coordination cycle in milliseconds. The `dutyGroup` dimension indicates what type of coordination this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.| |`coordinator/global/time`|Approximate runtime of a full coordination cycle in milliseconds. The `dutyGroup` dimension indicates what type of coordination this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.|
|`metadata/kill/audit/count`|Total number of audit logs automatically deleted from metadata store audit table per each Coordinator kill audit duty run. This metric can help adjust `druid.coordinator.kill.audit.durationToRetain` configuration based on if more or less audit logs need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.audit.on` is set to true.| |Varies.|
If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.md#dynamic-configuration), then [log entries](../configuration/logging.md) for class If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.md#dynamic-configuration), then [log entries](../configuration/logging.md) for class
`org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics` will have extra information on balancing `org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics` will have extra information on balancing

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface CoordinatorMetadataStoreManagementDuty
{
}

View File

@ -41,6 +41,7 @@ import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException; import java.io.IOException;
@ -229,6 +230,24 @@ public class SQLAuditManager implements AuditManager
return fetchAuditHistoryLastEntries(null, type, limit); return fetchAuditHistoryLastEntries(null, type, limit);
} }
@Override
public int removeAuditLogsOlderThan(final long timestamp)
{
DateTime dateTime = DateTimes.utc(timestamp);
return dbi.withHandle(
handle -> {
Update sql = handle.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE created_date < :date_time",
getAuditTable()
)
);
return sql.bind("date_time", dateTime.toString())
.execute();
}
);
}
private List<AuditEntry> fetchAuditHistoryLastEntries(final String key, final String type, int limit) private List<AuditEntry> fetchAuditHistoryLastEntries(final String key, final String type, int limit)
throws IllegalArgumentException throws IllegalArgumentException
{ {

View File

@ -47,6 +47,7 @@ import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty; import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
@ -150,6 +151,7 @@ public class DruidCoordinator
private final ServiceAnnouncer serviceAnnouncer; private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self; private final DruidNode self;
private final Set<CoordinatorDuty> indexingServiceDuties; private final Set<CoordinatorDuty> indexingServiceDuties;
private final Set<CoordinatorDuty> metadataStoreManagementDuties;
private final BalancerStrategyFactory factory; private final BalancerStrategyFactory factory;
private final LookupCoordinatorManager lookupCoordinatorManager; private final LookupCoordinatorManager lookupCoordinatorManager;
private final DruidLeaderSelector coordLeaderSelector; private final DruidLeaderSelector coordLeaderSelector;
@ -164,6 +166,7 @@ public class DruidCoordinator
private ListeningExecutorService balancerExec; private ListeningExecutorService balancerExec;
private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties"; private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties"; private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties"; private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";
@ -182,6 +185,7 @@ public class DruidCoordinator
LoadQueueTaskMaster taskMaster, LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer, ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self, @Self DruidNode self,
@CoordinatorMetadataStoreManagementDuty Set<CoordinatorDuty> metadataStoreManagementDuties,
@CoordinatorIndexingServiceDuty Set<CoordinatorDuty> indexingServiceDuties, @CoordinatorIndexingServiceDuty Set<CoordinatorDuty> indexingServiceDuties,
BalancerStrategyFactory factory, BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager, LookupCoordinatorManager lookupCoordinatorManager,
@ -206,6 +210,7 @@ public class DruidCoordinator
self, self,
new ConcurrentHashMap<>(), new ConcurrentHashMap<>(),
indexingServiceDuties, indexingServiceDuties,
metadataStoreManagementDuties,
factory, factory,
lookupCoordinatorManager, lookupCoordinatorManager,
coordLeaderSelector, coordLeaderSelector,
@ -230,6 +235,7 @@ public class DruidCoordinator
DruidNode self, DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap, ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
Set<CoordinatorDuty> indexingServiceDuties, Set<CoordinatorDuty> indexingServiceDuties,
Set<CoordinatorDuty> metadataStoreManagementDuties,
BalancerStrategyFactory factory, BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager, LookupCoordinatorManager lookupCoordinatorManager,
DruidLeaderSelector coordLeaderSelector, DruidLeaderSelector coordLeaderSelector,
@ -255,6 +261,7 @@ public class DruidCoordinator
this.serviceAnnouncer = serviceAnnouncer; this.serviceAnnouncer = serviceAnnouncer;
this.self = self; this.self = self;
this.indexingServiceDuties = indexingServiceDuties; this.indexingServiceDuties = indexingServiceDuties;
this.metadataStoreManagementDuties = metadataStoreManagementDuties;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
@ -665,6 +672,12 @@ public class DruidCoordinator
) )
); );
} }
dutiesRunnables.add(
Pair.of(
new DutiesRunnable(makeMetadataStoreManagementDuties(), startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
config.getCoordinatorMetadataStoreManagementPeriod()
)
);
for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) { for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) {
// CompactSegmentsDuty can takes a non trival amount of time to complete. // CompactSegmentsDuty can takes a non trival amount of time to complete.
@ -750,6 +763,19 @@ public class DruidCoordinator
return ImmutableList.copyOf(duties); return ImmutableList.copyOf(duties);
} }
private List<CoordinatorDuty> makeMetadataStoreManagementDuties()
{
List<CoordinatorDuty> duties = ImmutableList.<CoordinatorDuty>builder()
.addAll(metadataStoreManagementDuties)
.build();
log.debug(
"Done making metadata store management duties %s",
duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())
);
return ImmutableList.copyOf(duties);
}
private List<CoordinatorDuty> makeCompactSegmentsDuty() private List<CoordinatorDuty> makeCompactSegmentsDuty()
{ {
return ImmutableList.of(compactSegments); return ImmutableList.of(compactSegments);

View File

@ -39,6 +39,10 @@ public abstract class DruidCoordinatorConfig
@Default("PT1800s") @Default("PT1800s")
public abstract Duration getCoordinatorIndexingPeriod(); public abstract Duration getCoordinatorIndexingPeriod();
@Config("druid.coordinator.period.metadataStoreManagementPeriod")
@Default("PT1H")
public abstract Duration getCoordinatorMetadataStoreManagementPeriod();
@Config("druid.coordinator.kill.period") @Config("druid.coordinator.kill.period")
@Default("P1D") @Default("P1D")
public abstract Duration getCoordinatorKillPeriod(); public abstract Duration getCoordinatorKillPeriod();
@ -51,6 +55,14 @@ public abstract class DruidCoordinatorConfig
@Default("0") @Default("0")
public abstract int getCoordinatorKillMaxSegments(); public abstract int getCoordinatorKillMaxSegments();
@Config("druid.coordinator.kill.audit.period")
@Default("P1D")
public abstract Duration getCoordinatorAuditKillPeriod();
@Config("druid.coordinator.kill.audit.durationToRetain")
@Default("PT-1s")
public abstract Duration getCoordinatorAuditKillDurationToRetain();
@Config("druid.coordinator.load.timeout") @Config("druid.coordinator.load.timeout")
public Duration getLoadTimeoutDelay() public Duration getLoadTimeoutDelay()
{ {

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
public class KillAuditLog implements CoordinatorDuty
{
private static final Logger log = new Logger(KillAuditLog.class);
private final long period;
private final long retainDuration;
private long lastKillTime = 0;
private final AuditManager auditManager;
@Inject
public KillAuditLog(
AuditManager auditManager,
DruidCoordinatorConfig config
)
{
this.period = config.getCoordinatorAuditKillPeriod().getMillis();
Preconditions.checkArgument(
this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
"coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"
);
this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis();
Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0");
log.debug(
"Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]",
this.period,
this.retainDuration
);
this.auditManager = auditManager;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
if ((lastKillTime + period) < System.currentTimeMillis()) {
lastKillTime = System.currentTimeMillis();
long timestamp = System.currentTimeMillis() - retainDuration;
int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
ServiceEmitter emitter = params.getEmitter();
emitter.emit(
new ServiceMetricEvent.Builder().build(
"metadata/kill/audit/count",
auditRemoved
)
);
log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved);
}
return params;
}
}

View File

@ -256,6 +256,83 @@ public class SQLAuditManagerTest
Assert.assertEquals(entry1AuditInfo, auditEntries.get(0).getAuditInfo()); Assert.assertEquals(entry1AuditInfo, auditEntries.get(0).getAuditInfo());
} }
@Test(timeout = 60_000L)
public void testRemoveAuditLogsOlderThanWithEntryOlderThanTime() throws IOException
{
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
String entry1Payload = "testPayload";
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
byte[] payload = connector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
"audit_key",
"payload",
"testKey"
);
AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
Assert.assertEquals(entry1Key, dbEntry.getKey());
Assert.assertEquals(entry1Payload, dbEntry.getPayload());
Assert.assertEquals(entry1Type, dbEntry.getType());
Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
// Do delete
auditManager.removeAuditLogsOlderThan(System.currentTimeMillis());
// Verify the delete
payload = connector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
"audit_key",
"payload",
"testKey"
);
Assert.assertNull(payload);
}
@Test(timeout = 60_000L)
public void testRemoveAuditLogsOlderThanWithEntryNotOlderThanTime() throws IOException
{
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
String entry1Payload = "testPayload";
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
byte[] payload = connector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
"audit_key",
"payload",
"testKey"
);
AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
Assert.assertEquals(entry1Key, dbEntry.getKey());
Assert.assertEquals(entry1Payload, dbEntry.getPayload());
Assert.assertEquals(entry1Type, dbEntry.getType());
Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
// Do delete
auditManager.removeAuditLogsOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis());
// Verify that entry was not delete
payload = connector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
"audit_key",
"payload",
"testKey"
);
dbEntry = mapper.readValue(payload, AuditEntry.class);
Assert.assertEquals(entry1Key, dbEntry.getKey());
Assert.assertEquals(entry1Payload, dbEntry.getPayload());
Assert.assertEquals(entry1Type, dbEntry.getType());
Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
}
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testFetchAuditHistoryByTypeWithLimit() public void testFetchAuditHistoryByTypeWithLimit()
{ {

View File

@ -169,8 +169,11 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
new Duration(COORDINATOR_PERIOD), new Duration(COORDINATOR_PERIOD),
null, null,
null, null,
null,
new Duration(COORDINATOR_PERIOD), new Duration(COORDINATOR_PERIOD),
null, null,
null,
null,
10, 10,
new Duration("PT0s") new Duration("PT0s")
); );
@ -247,6 +250,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
druidNode, druidNode,
loadManagementPeons, loadManagementPeons,
null, null,
null,
new CostBalancerStrategyFactory(), new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class), EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(), new TestDruidLeaderSelector(),
@ -546,6 +550,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
druidNode, druidNode,
loadManagementPeons, loadManagementPeons,
null, null,
null,
new CostBalancerStrategyFactory(), new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class), EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(), new TestDruidLeaderSelector(),

View File

@ -141,8 +141,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
new Duration(COORDINATOR_PERIOD), new Duration(COORDINATOR_PERIOD),
null, null,
null, null,
null,
new Duration(COORDINATOR_PERIOD), new Duration(COORDINATOR_PERIOD),
null, null,
null,
null,
10, 10,
new Duration("PT0s") new Duration("PT0s")
); );
@ -212,6 +215,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
druidNode, druidNode,
loadManagementPeons, loadManagementPeons,
null, null,
new HashSet<>(),
new CostBalancerStrategyFactory(), new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class), EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(), new TestDruidLeaderSelector(),
@ -738,6 +742,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
null, null,
null, null,
null, null,
null,
ZkEnablementConfig.ENABLED ZkEnablementConfig.ENABLED
); );

View File

@ -80,6 +80,9 @@ public class HttpLoadQueuePeonTest
null, null,
null, null,
null, null,
null,
null,
null,
10, 10,
Duration.ZERO Duration.ZERO
) )

View File

@ -95,6 +95,9 @@ public class LoadQueuePeonTest extends CuratorTestBase
null, null,
null, null,
null, null,
null,
null,
null,
10, 10,
Duration.millis(0) Duration.millis(0)
) )
@ -287,9 +290,12 @@ public class LoadQueuePeonTest extends CuratorTestBase
null, null,
null, null,
null, null,
null,
new Duration(1), new Duration(1),
null, null,
null, null,
null,
null,
10, 10,
new Duration("PT1s") new Duration("PT1s")
) )
@ -339,9 +345,12 @@ public class LoadQueuePeonTest extends CuratorTestBase
null, null,
null, null,
null, null,
null,
new Duration(1), new Duration(1),
null, null,
null, null,
null,
null,
10, 10,
new Duration("PT1s") new Duration("PT1s")
) )

View File

@ -41,9 +41,12 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
null, null,
null, null,
null, null,
null,
new Duration(1), new Duration(1),
null, null,
null, null,
null,
null,
10, 10,
new Duration("PT1s") new Duration("PT1s")
) )

View File

@ -27,9 +27,12 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private final Duration coordinatorStartDelay; private final Duration coordinatorStartDelay;
private final Duration coordinatorPeriod; private final Duration coordinatorPeriod;
private final Duration coordinatorIndexingPeriod; private final Duration coordinatorIndexingPeriod;
private final Duration metadataStoreManagementPeriod;
private final Duration loadTimeoutDelay; private final Duration loadTimeoutDelay;
private final Duration coordinatorKillPeriod; private final Duration coordinatorKillPeriod;
private final Duration coordinatorKillDurationToRetain; private final Duration coordinatorKillDurationToRetain;
private final Duration coordinatorAuditKillPeriod;
private final Duration coordinatorAuditKillDurationToRetain;
private final Duration getLoadQueuePeonRepeatDelay; private final Duration getLoadQueuePeonRepeatDelay;
private final int coordinatorKillMaxSegments; private final int coordinatorKillMaxSegments;
@ -37,9 +40,12 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
Duration coordinatorStartDelay, Duration coordinatorStartDelay,
Duration coordinatorPeriod, Duration coordinatorPeriod,
Duration coordinatorIndexingPeriod, Duration coordinatorIndexingPeriod,
Duration metadataStoreManagementPeriod,
Duration loadTimeoutDelay, Duration loadTimeoutDelay,
Duration coordinatorKillPeriod, Duration coordinatorKillPeriod,
Duration coordinatorKillDurationToRetain, Duration coordinatorKillDurationToRetain,
Duration coordinatorAuditKillPeriod,
Duration coordinatorAuditKillDurationToRetain,
int coordinatorKillMaxSegments, int coordinatorKillMaxSegments,
Duration getLoadQueuePeonRepeatDelay Duration getLoadQueuePeonRepeatDelay
) )
@ -47,9 +53,12 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
this.coordinatorStartDelay = coordinatorStartDelay; this.coordinatorStartDelay = coordinatorStartDelay;
this.coordinatorPeriod = coordinatorPeriod; this.coordinatorPeriod = coordinatorPeriod;
this.coordinatorIndexingPeriod = coordinatorIndexingPeriod; this.coordinatorIndexingPeriod = coordinatorIndexingPeriod;
this.metadataStoreManagementPeriod = metadataStoreManagementPeriod;
this.loadTimeoutDelay = loadTimeoutDelay; this.loadTimeoutDelay = loadTimeoutDelay;
this.coordinatorKillPeriod = coordinatorKillPeriod; this.coordinatorKillPeriod = coordinatorKillPeriod;
this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain; this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain;
this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments; this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay; this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
} }
@ -72,6 +81,12 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
return coordinatorIndexingPeriod; return coordinatorIndexingPeriod;
} }
@Override
public Duration getCoordinatorMetadataStoreManagementPeriod()
{
return metadataStoreManagementPeriod;
}
@Override @Override
public Duration getCoordinatorKillPeriod() public Duration getCoordinatorKillPeriod()
{ {
@ -84,6 +99,18 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
return coordinatorKillDurationToRetain; return coordinatorKillDurationToRetain;
} }
@Override
public Duration getCoordinatorAuditKillPeriod()
{
return coordinatorAuditKillPeriod;
}
@Override
public Duration getCoordinatorAuditKillDurationToRetain()
{
return coordinatorAuditKillDurationToRetain;
}
@Override @Override
public int getCoordinatorKillMaxSegments() public int getCoordinatorKillMaxSegments()
{ {

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class KillAuditLogTest
{
@Mock
private AuditManager mockAuditManager;
@Mock
private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
@Mock
private ServiceEmitter mockServiceEmitter;
@Rule
public ExpectedException exception = ExpectedException.none();
private KillAuditLog killAuditLog;
@Test
public void testRunSkipIfLastRunLessThanPeriod()
{
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
null,
null,
null,
new Duration("PT5S"),
null,
null,
null,
new Duration(Long.MAX_VALUE),
new Duration("PT1S"),
10,
null
);
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
killAuditLog.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyZeroInteractions(mockAuditManager);
}
@Test
public void testRunNotSkipIfLastRunMoreThanPeriod()
{
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
null,
null,
null,
new Duration("PT5S"),
null,
null,
null,
new Duration("PT6S"),
new Duration("PT1S"),
10,
null
);
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
killAuditLog.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong());
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
}
@Test
public void testConstructorFailIfInvalidPeriod()
{
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
null,
null,
null,
new Duration("PT5S"),
null,
null,
null,
new Duration("PT3S"),
new Duration("PT1S"),
10,
null
);
exception.expect(IllegalArgumentException.class);
exception.expectMessage("coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
}
@Test
public void testConstructorFailIfInvalidRetainDuration()
{
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
null,
null,
null,
new Duration("PT5S"),
null,
null,
null,
new Duration("PT6S"),
new Duration("PT-1S"),
10,
null
);
exception.expect(IllegalArgumentException.class);
exception.expectMessage("coordinator audit kill retainDuration must be >= 0");
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
}
}

View File

@ -105,9 +105,12 @@ public class KillUnusedSegmentsTest
null, null,
null, null,
Duration.parse("PT76400S"), Duration.parse("PT76400S"),
null,
new Duration(1), new Duration(1),
Duration.parse("PT86400S"), Duration.parse("PT86400S"),
Duration.parse("PT86400S"), Duration.parse("PT86400S"),
null,
null,
1000, 1000,
Duration.ZERO Duration.ZERO
) )

View File

@ -46,6 +46,7 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty; import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.http.JettyHttpClientModule; import org.apache.druid.guice.http.JettyHttpClientModule;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
@ -71,6 +72,7 @@ import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.KillStalePendingSegments; import org.apache.druid.server.coordinator.KillStalePendingSegments;
import org.apache.druid.server.coordinator.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty; import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.KillAuditLog;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.http.ClusterResource; import org.apache.druid.server.http.ClusterResource;
import org.apache.druid.server.http.CompactionResource; import org.apache.druid.server.http.CompactionResource;
@ -214,14 +216,14 @@ public class CliCoordinator extends ServerRunnable
LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, Server.class);
LifecycleModule.register(binder, DataSourcesResource.class); LifecycleModule.register(binder, DataSourcesResource.class);
final ConditionalMultibind<CoordinatorDuty> conditionalMultibind = ConditionalMultibind.create( // Binding for Set of indexing service coordinator Ddty
final ConditionalMultibind<CoordinatorDuty> conditionalIndexingServiceDutyMultibind = ConditionalMultibind.create(
properties, properties,
binder, binder,
CoordinatorDuty.class, CoordinatorDuty.class,
CoordinatorIndexingServiceDuty.class CoordinatorIndexingServiceDuty.class
); );
if (conditionalIndexingServiceDutyMultibind.matchCondition("druid.coordinator.merge.on", Predicates.equalTo("true"))) {
if (conditionalMultibind.matchCondition("druid.coordinator.merge.on", Predicates.equalTo("true"))) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"'druid.coordinator.merge.on' is not supported anymore. " "'druid.coordinator.merge.on' is not supported anymore. "
+ "Please consider using Coordinator's automatic compaction instead. " + "Please consider using Coordinator's automatic compaction instead. "
@ -230,19 +232,31 @@ public class CliCoordinator extends ServerRunnable
+ "for more details about compaction." + "for more details about compaction."
); );
} }
conditionalIndexingServiceDutyMultibind.addConditionBinding(
conditionalMultibind.addConditionBinding(
"druid.coordinator.kill.on", "druid.coordinator.kill.on",
Predicates.equalTo("true"), Predicates.equalTo("true"),
KillUnusedSegments.class KillUnusedSegments.class
); );
conditionalMultibind.addConditionBinding( conditionalIndexingServiceDutyMultibind.addConditionBinding(
"druid.coordinator.kill.pendingSegments.on", "druid.coordinator.kill.pendingSegments.on",
"true", "true",
Predicates.equalTo("true"), Predicates.equalTo("true"),
KillStalePendingSegments.class KillStalePendingSegments.class
); );
// Binding for Set of metadata store management coordinator Ddty
final ConditionalMultibind<CoordinatorDuty> conditionalMetadataStoreManagementDutyMultibind = ConditionalMultibind.create(
properties,
binder,
CoordinatorDuty.class,
CoordinatorMetadataStoreManagementDuty.class
);
conditionalMetadataStoreManagementDutyMultibind.addConditionBinding(
"druid.coordinator.kill.audit.on",
Predicates.equalTo("true"),
KillAuditLog.class
);
bindNodeRoleAndAnnouncer( bindNodeRoleAndAnnouncer(
binder, binder,
Coordinator.class, Coordinator.class,

View File

@ -1683,6 +1683,7 @@ PT1S
PT24H PT24H
PT300S PT300S
PT30S PT30S
PT3600S
PT5M PT5M
PT5S PT5S
PT60S PT60S