diff --git a/core/src/main/java/org/apache/druid/audit/AuditManager.java b/core/src/main/java/org/apache/druid/audit/AuditManager.java index 73804d7da35..f403423c632 100644 --- a/core/src/main/java/org/apache/druid/audit/AuditManager.java +++ b/core/src/main/java/org/apache/druid/audit/AuditManager.java @@ -91,4 +91,12 @@ public interface AuditManager * @return list of AuditEntries satisfying the passed parameters */ List fetchAuditHistory(String type, int limit); + + /** + * Remove audit logs created older than the given timestamp. + * + * @param timestamp timestamp in milliseconds + * @return number of audit logs removed + */ + int removeAuditLogsOlderThan(long timestamp); } diff --git a/docs/configuration/index.md b/docs/configuration/index.md index cd26a9e7afe..d49bf75675f 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -742,6 +742,15 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator process should act like an Overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone Overlord processes. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false| |`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord processes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL| +##### Metadata Management + +|Property|Description|Required?|Default| +|--------|-----------|---------|-------| +|`druid.coordinator.period.metadataStoreManagementPeriod`|How often to run metadata management tasks in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. |No | `PT1H`| +|`druid.coordinator.kill.audit.on`| Boolean value for whether to enable automatic deletion of audit logs. If set to true, Coordinator will periodically remove audit logs from the audit table entries in metadata storage.| No | False| +|`druid.coordinator.kill.audit.period`| How often to do automatic deletion of audit logs in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.audit.on` is set to True.| No| `P1D`| +|`druid.coordinator.kill.audit.durationToRetain`| Duration of audit logs to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.audit.on` is set to True.| Yes if `druid.coordinator.kill.audit.on` is set to True| None| + ##### Segment Management |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 9a1d2c7e498..5a2d5fb76fd 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -256,6 +256,8 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`interval/skipCompact/count`|Total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.| |`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The duty dimension is the string alias of the Duty that is being run.|duty.|Varies.| |`coordinator/global/time`|Approximate runtime of a full coordination cycle in milliseconds. The `dutyGroup` dimension indicates what type of coordination this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.| +|`metadata/kill/audit/count`|Total number of audit logs automatically deleted from metadata store audit table per each Coordinator kill audit duty run. This metric can help adjust `druid.coordinator.kill.audit.durationToRetain` configuration based on if more or less audit logs need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.audit.on` is set to true.| |Varies.| + If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.md#dynamic-configuration), then [log entries](../configuration/logging.md) for class `org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics` will have extra information on balancing diff --git a/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java b/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java new file mode 100644 index 00000000000..47cde2b3394 --- /dev/null +++ b/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface CoordinatorMetadataStoreManagementDuty +{ +} diff --git a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java index 13020436174..91a5a214e0b 100644 --- a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java +++ b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java @@ -41,6 +41,7 @@ import org.joda.time.Interval; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.Update; import org.skife.jdbi.v2.tweak.HandleCallback; import java.io.IOException; @@ -229,6 +230,24 @@ public class SQLAuditManager implements AuditManager return fetchAuditHistoryLastEntries(null, type, limit); } + @Override + public int removeAuditLogsOlderThan(final long timestamp) + { + DateTime dateTime = DateTimes.utc(timestamp); + return dbi.withHandle( + handle -> { + Update sql = handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE created_date < :date_time", + getAuditTable() + ) + ); + return sql.bind("date_time", dateTime.toString()) + .execute(); + } + ); + } + private List fetchAuditHistoryLastEntries(final String key, final String type, int limit) throws IllegalArgumentException { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 17a3794de5f..7da4fe6b300 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -47,6 +47,7 @@ import org.apache.druid.curator.discovery.ServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty; +import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -150,6 +151,7 @@ public class DruidCoordinator private final ServiceAnnouncer serviceAnnouncer; private final DruidNode self; private final Set indexingServiceDuties; + private final Set metadataStoreManagementDuties; private final BalancerStrategyFactory factory; private final LookupCoordinatorManager lookupCoordinatorManager; private final DruidLeaderSelector coordLeaderSelector; @@ -164,6 +166,7 @@ public class DruidCoordinator private ListeningExecutorService balancerExec; private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties"; + private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties"; private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties"; private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties"; @@ -182,6 +185,7 @@ public class DruidCoordinator LoadQueueTaskMaster taskMaster, ServiceAnnouncer serviceAnnouncer, @Self DruidNode self, + @CoordinatorMetadataStoreManagementDuty Set metadataStoreManagementDuties, @CoordinatorIndexingServiceDuty Set indexingServiceDuties, BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, @@ -206,6 +210,7 @@ public class DruidCoordinator self, new ConcurrentHashMap<>(), indexingServiceDuties, + metadataStoreManagementDuties, factory, lookupCoordinatorManager, coordLeaderSelector, @@ -230,6 +235,7 @@ public class DruidCoordinator DruidNode self, ConcurrentMap loadQueuePeonMap, Set indexingServiceDuties, + Set metadataStoreManagementDuties, BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, DruidLeaderSelector coordLeaderSelector, @@ -255,6 +261,7 @@ public class DruidCoordinator this.serviceAnnouncer = serviceAnnouncer; this.self = self; this.indexingServiceDuties = indexingServiceDuties; + this.metadataStoreManagementDuties = metadataStoreManagementDuties; this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); @@ -665,6 +672,12 @@ public class DruidCoordinator ) ); } + dutiesRunnables.add( + Pair.of( + new DutiesRunnable(makeMetadataStoreManagementDuties(), startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP), + config.getCoordinatorMetadataStoreManagementPeriod() + ) + ); for (final Pair dutiesRunnable : dutiesRunnables) { // CompactSegmentsDuty can takes a non trival amount of time to complete. @@ -750,6 +763,19 @@ public class DruidCoordinator return ImmutableList.copyOf(duties); } + private List makeMetadataStoreManagementDuties() + { + List duties = ImmutableList.builder() + .addAll(metadataStoreManagementDuties) + .build(); + + log.debug( + "Done making metadata store management duties %s", + duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()) + ); + return ImmutableList.copyOf(duties); + } + private List makeCompactSegmentsDuty() { return ImmutableList.of(compactSegments); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index 130d7e88d84..933b974d3ad 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -39,6 +39,10 @@ public abstract class DruidCoordinatorConfig @Default("PT1800s") public abstract Duration getCoordinatorIndexingPeriod(); + @Config("druid.coordinator.period.metadataStoreManagementPeriod") + @Default("PT1H") + public abstract Duration getCoordinatorMetadataStoreManagementPeriod(); + @Config("druid.coordinator.kill.period") @Default("P1D") public abstract Duration getCoordinatorKillPeriod(); @@ -51,6 +55,14 @@ public abstract class DruidCoordinatorConfig @Default("0") public abstract int getCoordinatorKillMaxSegments(); + @Config("druid.coordinator.kill.audit.period") + @Default("P1D") + public abstract Duration getCoordinatorAuditKillPeriod(); + + @Config("druid.coordinator.kill.audit.durationToRetain") + @Default("PT-1s") + public abstract Duration getCoordinatorAuditKillDurationToRetain(); + @Config("druid.coordinator.load.timeout") public Duration getLoadTimeoutDelay() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java new file mode 100644 index 00000000000..651cf6b60c9 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import org.apache.druid.audit.AuditManager; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; + +public class KillAuditLog implements CoordinatorDuty +{ + private static final Logger log = new Logger(KillAuditLog.class); + + private final long period; + private final long retainDuration; + private long lastKillTime = 0; + + private final AuditManager auditManager; + + @Inject + public KillAuditLog( + AuditManager auditManager, + DruidCoordinatorConfig config + ) + { + this.period = config.getCoordinatorAuditKillPeriod().getMillis(); + Preconditions.checkArgument( + this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(), + "coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod" + ); + this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis(); + Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0"); + log.debug( + "Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]", + this.period, + this.retainDuration + ); + this.auditManager = auditManager; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + if ((lastKillTime + period) < System.currentTimeMillis()) { + lastKillTime = System.currentTimeMillis(); + + long timestamp = System.currentTimeMillis() - retainDuration; + int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/audit/count", + auditRemoved + ) + ); + log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved); + } + return params; + } +} diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java index 429402e9e9e..d336e84e6f1 100644 --- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java @@ -128,15 +128,15 @@ public class SQLAuditManagerTest public void testAuditMetricEventBuilderConfig() { AuditEntry entry = new AuditEntry( - "testKey", - "testType", - new AuditInfo( - "testAuthor", - "testComment", - "127.0.0.1" - ), - "testPayload", - DateTimes.of("2013-01-01T00:00:00Z") + "testKey", + "testType", + new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" + ), + "testPayload", + DateTimes.of("2013-01-01T00:00:00Z") ); SQLAuditManager auditManagerWithPayloadAsDimension = new SQLAuditManager( @@ -256,6 +256,83 @@ public class SQLAuditManagerTest Assert.assertEquals(entry1AuditInfo, auditEntries.get(0).getAuditInfo()); } + @Test(timeout = 60_000L) + public void testRemoveAuditLogsOlderThanWithEntryOlderThanTime() throws IOException + { + String entry1Key = "testKey"; + String entry1Type = "testType"; + AuditInfo entry1AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" + ); + String entry1Payload = "testPayload"; + + auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde); + byte[] payload = connector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(), + "audit_key", + "payload", + "testKey" + ); + AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class); + Assert.assertEquals(entry1Key, dbEntry.getKey()); + Assert.assertEquals(entry1Payload, dbEntry.getPayload()); + Assert.assertEquals(entry1Type, dbEntry.getType()); + Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo()); + + // Do delete + auditManager.removeAuditLogsOlderThan(System.currentTimeMillis()); + // Verify the delete + payload = connector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(), + "audit_key", + "payload", + "testKey" + ); + Assert.assertNull(payload); + } + + @Test(timeout = 60_000L) + public void testRemoveAuditLogsOlderThanWithEntryNotOlderThanTime() throws IOException + { + String entry1Key = "testKey"; + String entry1Type = "testType"; + AuditInfo entry1AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" + ); + String entry1Payload = "testPayload"; + + auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde); + byte[] payload = connector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(), + "audit_key", + "payload", + "testKey" + ); + AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class); + Assert.assertEquals(entry1Key, dbEntry.getKey()); + Assert.assertEquals(entry1Payload, dbEntry.getPayload()); + Assert.assertEquals(entry1Type, dbEntry.getType()); + Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo()); + // Do delete + auditManager.removeAuditLogsOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis()); + // Verify that entry was not delete + payload = connector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(), + "audit_key", + "payload", + "testKey" + ); + dbEntry = mapper.readValue(payload, AuditEntry.class); + Assert.assertEquals(entry1Key, dbEntry.getKey()); + Assert.assertEquals(entry1Payload, dbEntry.getPayload()); + Assert.assertEquals(entry1Type, dbEntry.getType()); + Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo()); + } + @Test(timeout = 60_000L) public void testFetchAuditHistoryByTypeWithLimit() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index dd382e31a65..a8deb667dd0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -169,8 +169,11 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase new Duration(COORDINATOR_PERIOD), null, null, + null, new Duration(COORDINATOR_PERIOD), null, + null, + null, 10, new Duration("PT0s") ); @@ -247,6 +250,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase druidNode, loadManagementPeons, null, + null, new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), @@ -546,6 +550,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase druidNode, loadManagementPeons, null, + null, new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 36f1bf67f0b..3aaef647c41 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -141,8 +141,11 @@ public class DruidCoordinatorTest extends CuratorTestBase new Duration(COORDINATOR_PERIOD), null, null, + null, new Duration(COORDINATOR_PERIOD), null, + null, + null, 10, new Duration("PT0s") ); @@ -212,6 +215,7 @@ public class DruidCoordinatorTest extends CuratorTestBase druidNode, loadManagementPeons, null, + new HashSet<>(), new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), @@ -738,6 +742,7 @@ public class DruidCoordinatorTest extends CuratorTestBase null, null, null, + null, ZkEnablementConfig.ENABLED ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java index 10d7ba24c40..7b07d7ffa61 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -80,6 +80,9 @@ public class HttpLoadQueuePeonTest null, null, null, + null, + null, + null, 10, Duration.ZERO ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java index 4a2a25f682c..24e68063b6a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java @@ -95,6 +95,9 @@ public class LoadQueuePeonTest extends CuratorTestBase null, null, null, + null, + null, + null, 10, Duration.millis(0) ) @@ -287,9 +290,12 @@ public class LoadQueuePeonTest extends CuratorTestBase null, null, null, + null, new Duration(1), null, null, + null, + null, 10, new Duration("PT1s") ) @@ -339,9 +345,12 @@ public class LoadQueuePeonTest extends CuratorTestBase null, null, null, + null, new Duration(1), null, null, + null, + null, 10, new Duration("PT1s") ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java index fb9287320dc..5185452779c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java @@ -41,9 +41,12 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon null, null, null, + null, new Duration(1), null, null, + null, + null, 10, new Duration("PT1s") ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index fef244a6296..135f8d0557e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -27,9 +27,12 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final Duration coordinatorStartDelay; private final Duration coordinatorPeriod; private final Duration coordinatorIndexingPeriod; + private final Duration metadataStoreManagementPeriod; private final Duration loadTimeoutDelay; private final Duration coordinatorKillPeriod; private final Duration coordinatorKillDurationToRetain; + private final Duration coordinatorAuditKillPeriod; + private final Duration coordinatorAuditKillDurationToRetain; private final Duration getLoadQueuePeonRepeatDelay; private final int coordinatorKillMaxSegments; @@ -37,9 +40,12 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig Duration coordinatorStartDelay, Duration coordinatorPeriod, Duration coordinatorIndexingPeriod, + Duration metadataStoreManagementPeriod, Duration loadTimeoutDelay, Duration coordinatorKillPeriod, Duration coordinatorKillDurationToRetain, + Duration coordinatorAuditKillPeriod, + Duration coordinatorAuditKillDurationToRetain, int coordinatorKillMaxSegments, Duration getLoadQueuePeonRepeatDelay ) @@ -47,9 +53,12 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig this.coordinatorStartDelay = coordinatorStartDelay; this.coordinatorPeriod = coordinatorPeriod; this.coordinatorIndexingPeriod = coordinatorIndexingPeriod; + this.metadataStoreManagementPeriod = metadataStoreManagementPeriod; this.loadTimeoutDelay = loadTimeoutDelay; this.coordinatorKillPeriod = coordinatorKillPeriod; this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain; + this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod; + this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain; this.coordinatorKillMaxSegments = coordinatorKillMaxSegments; this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay; } @@ -72,6 +81,12 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig return coordinatorIndexingPeriod; } + @Override + public Duration getCoordinatorMetadataStoreManagementPeriod() + { + return metadataStoreManagementPeriod; + } + @Override public Duration getCoordinatorKillPeriod() { @@ -84,6 +99,18 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig return coordinatorKillDurationToRetain; } + @Override + public Duration getCoordinatorAuditKillPeriod() + { + return coordinatorAuditKillPeriod; + } + + @Override + public Duration getCoordinatorAuditKillDurationToRetain() + { + return coordinatorAuditKillDurationToRetain; + } + @Override public int getCoordinatorKillMaxSegments() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java new file mode 100644 index 00000000000..b0f273cba47 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import org.apache.druid.audit.AuditManager; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KillAuditLogTest +{ + @Mock + private AuditManager mockAuditManager; + + @Mock + private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; + + @Mock + private ServiceEmitter mockServiceEmitter; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private KillAuditLog killAuditLog; + + @Test + public void testRunSkipIfLastRunLessThanPeriod() + { + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + new Duration(Long.MAX_VALUE), + new Duration("PT1S"), + 10, + null + ); + killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); + killAuditLog.run(mockDruidCoordinatorRuntimeParams); + Mockito.verifyZeroInteractions(mockAuditManager); + } + + @Test + public void testRunNotSkipIfLastRunMoreThanPeriod() + { + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + new Duration("PT6S"), + new Duration("PT1S"), + 10, + null + ); + killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); + killAuditLog.run(mockDruidCoordinatorRuntimeParams); + Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong()); + Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + } + + @Test + public void testConstructorFailIfInvalidPeriod() + { + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + new Duration("PT3S"), + new Duration("PT1S"), + 10, + null + ); + exception.expect(IllegalArgumentException.class); + exception.expectMessage("coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); + killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); + } + + @Test + public void testConstructorFailIfInvalidRetainDuration() + { + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + new Duration("PT6S"), + new Duration("PT-1S"), + 10, + null + ); + exception.expect(IllegalArgumentException.class); + exception.expectMessage("coordinator audit kill retainDuration must be >= 0"); + killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 38aa78e4467..cd766660dfb 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -105,9 +105,12 @@ public class KillUnusedSegmentsTest null, null, Duration.parse("PT76400S"), + null, new Duration(1), Duration.parse("PT86400S"), Duration.parse("PT86400S"), + null, + null, 1000, Duration.ZERO ) diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 1876a28836e..9e3c65b3cf1 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -46,6 +46,7 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty; +import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.http.JettyHttpClientModule; import org.apache.druid.java.util.common.concurrent.Execs; @@ -71,6 +72,7 @@ import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.KillStalePendingSegments; import org.apache.druid.server.coordinator.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.duty.CoordinatorDuty; +import org.apache.druid.server.coordinator.duty.KillAuditLog; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.http.ClusterResource; import org.apache.druid.server.http.CompactionResource; @@ -214,14 +216,14 @@ public class CliCoordinator extends ServerRunnable LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, DataSourcesResource.class); - final ConditionalMultibind conditionalMultibind = ConditionalMultibind.create( + // Binding for Set of indexing service coordinator Ddty + final ConditionalMultibind conditionalIndexingServiceDutyMultibind = ConditionalMultibind.create( properties, binder, CoordinatorDuty.class, CoordinatorIndexingServiceDuty.class ); - - if (conditionalMultibind.matchCondition("druid.coordinator.merge.on", Predicates.equalTo("true"))) { + if (conditionalIndexingServiceDutyMultibind.matchCondition("druid.coordinator.merge.on", Predicates.equalTo("true"))) { throw new UnsupportedOperationException( "'druid.coordinator.merge.on' is not supported anymore. " + "Please consider using Coordinator's automatic compaction instead. " @@ -230,19 +232,31 @@ public class CliCoordinator extends ServerRunnable + "for more details about compaction." ); } - - conditionalMultibind.addConditionBinding( + conditionalIndexingServiceDutyMultibind.addConditionBinding( "druid.coordinator.kill.on", Predicates.equalTo("true"), KillUnusedSegments.class ); - conditionalMultibind.addConditionBinding( + conditionalIndexingServiceDutyMultibind.addConditionBinding( "druid.coordinator.kill.pendingSegments.on", "true", Predicates.equalTo("true"), KillStalePendingSegments.class ); + // Binding for Set of metadata store management coordinator Ddty + final ConditionalMultibind conditionalMetadataStoreManagementDutyMultibind = ConditionalMultibind.create( + properties, + binder, + CoordinatorDuty.class, + CoordinatorMetadataStoreManagementDuty.class + ); + conditionalMetadataStoreManagementDutyMultibind.addConditionBinding( + "druid.coordinator.kill.audit.on", + Predicates.equalTo("true"), + KillAuditLog.class + ); + bindNodeRoleAndAnnouncer( binder, Coordinator.class, diff --git a/website/.spelling b/website/.spelling index bd2ecfa167c..315e5ffc9aa 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1683,6 +1683,7 @@ PT1S PT24H PT300S PT30S +PT3600S PT5M PT5S PT60S