From 016c881795c03fc6a107f46243ffc71854088e53 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Mon, 23 Jan 2023 13:23:45 -0800 Subject: [PATCH] Add API to return automatic compaction config history (#13699) Add a new API to return the history of changes to automatic compaction config history to make it easy for users to see what changes have been made to their auto-compaction config. The API is scoped per dataSource to allow users to triage issues with an individual dataSource. The API responds with a list of configs when there is a change to either the settings that impact all auto-compaction configs on a cluster or the dataSource in question. --- docs/operations/api-reference.md | 12 ++ .../DataSourceCompactionConfigAuditEntry.java | 124 ++++++++++++ .../DataSourceCompactionConfigHistory.java | 95 +++++++++ .../CoordinatorCompactionConfigsResource.java | 78 +++++++- ...aSourceCompactionConfigAuditEntryTest.java | 99 ++++++++++ ...DataSourceCompactionConfigHistoryTest.java | 185 ++++++++++++++++++ ...rdinatorCompactionConfigsResourceTest.java | 164 ++++++++++------ 7 files changed, 694 insertions(+), 63 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntry.java create mode 100644 server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistory.java create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 687fa6e5c55..99d69d4246d 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -470,6 +470,18 @@ Returns all automatic compaction configs. Returns an automatic compaction config of a dataSource. +`GET /druid/coordinator/v1/config/compaction/{dataSource}/history?interval={interval}&count={count}` + +Returns the history of the automatic compaction config for a dataSource. Optionally accepts `interval` and `count` +query string parameters to filter by interval and limit the number of results respectively. If the dataSource does not +exist or there is no compaction history for the dataSource, a 404 response is returned. + +The response contains a list of objects with the following keys: +* `globalConfig`: A json object containing automatic compaction config that applies to the entire cluster. +* `compactionConfig`: A json object containing the automatic compaction config for the datasource. +* `auditInfo`: A json object that contains information about the change made - like `author`, `comment` and `ip`. +* `auditTime`: The date and time when the change was made. + `POST /druid/coordinator/v1/config/compaction/taskslots?ratio={someRatio}&max={someMaxSlots}` Update the capacity for compaction tasks. `ratio` and `max` are used to limit the max number of compaction tasks. diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntry.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntry.java new file mode 100644 index 00000000000..3424212446f --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntry.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.audit.AuditInfo; +import org.joda.time.DateTime; + +/** + * A DTO containing audit information for compaction config for a datasource. + */ +public class DataSourceCompactionConfigAuditEntry +{ + private final GlobalCompactionConfig globalConfig; + private final DataSourceCompactionConfig compactionConfig; + private final AuditInfo auditInfo; + private final DateTime auditTime; + + @JsonCreator + public DataSourceCompactionConfigAuditEntry( + @JsonProperty("globalConfig") GlobalCompactionConfig globalConfig, + @JsonProperty("compactionConfig") DataSourceCompactionConfig compactionConfig, + @JsonProperty("auditInfo") AuditInfo auditInfo, + @JsonProperty("auditTime") DateTime auditTime + ) + { + this.globalConfig = globalConfig; + this.compactionConfig = compactionConfig; + this.auditInfo = auditInfo; + this.auditTime = auditTime; + } + + @JsonProperty + public GlobalCompactionConfig getGlobalConfig() + { + return globalConfig; + } + + @JsonProperty + public DataSourceCompactionConfig getCompactionConfig() + { + return compactionConfig; + } + + @JsonProperty + public AuditInfo getAuditInfo() + { + return auditInfo; + } + + @JsonProperty + public DateTime getAuditTime() + { + return auditTime; + } + + /** + * A DTO containing compaction config for that affects the entire cluster. + */ + public static class GlobalCompactionConfig + { + private final double compactionTaskSlotRatio; + private final int maxCompactionTaskSlots; + private final boolean useAutoScaleSlots; + + @JsonCreator + public GlobalCompactionConfig( + @JsonProperty("compactionTaskSlotRatio") + double compactionTaskSlotRatio, + @JsonProperty("maxCompactionTaskSlots") int maxCompactionTaskSlots, + @JsonProperty("useAutoScaleSlots") boolean useAutoScaleSlots + ) + { + this.compactionTaskSlotRatio = compactionTaskSlotRatio; + this.maxCompactionTaskSlots = maxCompactionTaskSlots; + this.useAutoScaleSlots = useAutoScaleSlots; + } + + @JsonProperty + public double getCompactionTaskSlotRatio() + { + return compactionTaskSlotRatio; + } + + @JsonProperty + public int getMaxCompactionTaskSlots() + { + return maxCompactionTaskSlots; + } + + @JsonProperty + public boolean isUseAutoScaleSlots() + { + return useAutoScaleSlots; + } + + @JsonIgnore + public boolean hasSameConfig(CoordinatorCompactionConfig coordinatorCompactionConfig) + { + return useAutoScaleSlots == coordinatorCompactionConfig.isUseAutoScaleSlots() && + compactionTaskSlotRatio == coordinatorCompactionConfig.getCompactionTaskSlotRatio() && + coordinatorCompactionConfig.getMaxCompactionTaskSlots() == maxCompactionTaskSlots; + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistory.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistory.java new file mode 100644 index 00000000000..ceb4be0d8a6 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import org.apache.druid.audit.AuditInfo; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Stack; + +/** + * A utility class to build the config history for a datasource from audit entries for + * {@link CoordinatorCompactionConfig}. The {@link CoordinatorCompactionConfig} contains the entire config for the + * cluster, so this class creates adds audit entires to the history only when a setting for this datasource or a global + * setting has changed. + */ +public class DataSourceCompactionConfigHistory +{ + private final Stack auditEntries = new Stack<>(); + private final String dataSource; + + public DataSourceCompactionConfigHistory(String dataSource) + { + this.dataSource = dataSource; + } + + public void add(CoordinatorCompactionConfig coordinatorCompactionConfig, AuditInfo auditInfo, DateTime auditTime) + { + DataSourceCompactionConfigAuditEntry current = auditEntries.isEmpty() ? null : auditEntries.peek(); + DataSourceCompactionConfigAuditEntry newEntry = null; + boolean hasDataSourceCompactionConfig = false; + for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { + if (dataSource.equals(dataSourceCompactionConfig.getDataSource())) { + hasDataSourceCompactionConfig = true; + if ( + current == null || + ( + !dataSourceCompactionConfig.equals(current.getCompactionConfig()) || + !current.getGlobalConfig().hasSameConfig(coordinatorCompactionConfig) + ) + ) { + current = new DataSourceCompactionConfigAuditEntry( + new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( + coordinatorCompactionConfig.getCompactionTaskSlotRatio(), + coordinatorCompactionConfig.getMaxCompactionTaskSlots(), + coordinatorCompactionConfig.isUseAutoScaleSlots() + ), + dataSourceCompactionConfig, + auditInfo, + auditTime + ); + newEntry = current; + } + break; + } + } + if (newEntry != null) { + auditEntries.push(newEntry); + } else if (current != null && !hasDataSourceCompactionConfig) { + newEntry = new DataSourceCompactionConfigAuditEntry( + new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( + coordinatorCompactionConfig.getCompactionTaskSlotRatio(), + coordinatorCompactionConfig.getMaxCompactionTaskSlots(), + coordinatorCompactionConfig.isUseAutoScaleSlots() + ), + null, + auditInfo, + auditTime + ); + auditEntries.push(newEntry); + } + } + + public List getHistory() + { + return auditEntries; + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index e791b6a49df..743654ee88d 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -19,21 +19,28 @@ package org.apache.druid.server.http; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.common.utils.ServletResourceUtils; +import org.apache.druid.guice.annotations.JsonNonNull; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory; import org.apache.druid.server.http.security.ConfigResourceFilter; +import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -49,6 +56,9 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; + +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.Callable; @@ -67,17 +77,23 @@ public class CoordinatorCompactionConfigsResource private final JacksonConfigManager manager; private final MetadataStorageConnector connector; private final MetadataStorageTablesConfig connectorConfig; + private final AuditManager auditManager; + private final ObjectMapper jsonMapperOnlyNonNullValue; @Inject public CoordinatorCompactionConfigsResource( JacksonConfigManager manager, MetadataStorageConnector connector, - MetadataStorageTablesConfig connectorConfig + MetadataStorageTablesConfig connectorConfig, + AuditManager auditManager, + @JsonNonNull ObjectMapper jsonMapperOnlyNonNullValue ) { this.manager = manager; this.connector = connector; this.connectorConfig = connectorConfig; + this.auditManager = auditManager; + this.jsonMapperOnlyNonNullValue = jsonMapperOnlyNonNullValue; } @GET @@ -101,7 +117,10 @@ public class CoordinatorCompactionConfigsResource { Callable callable = () -> { final byte[] currentBytes = getCurrentConfigInByteFromDb(); - final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes); + final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig( + manager, + currentBytes + ); final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from( current, compactionTaskSlotRatio, @@ -130,7 +149,10 @@ public class CoordinatorCompactionConfigsResource { Callable callable = () -> { final byte[] currentBytes = getCurrentConfigInByteFromDb(); - final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes); + final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig( + manager, + currentBytes + ); final CoordinatorCompactionConfig newCompactionConfig; final Map newConfigs = current .getCompactionConfigs() @@ -168,6 +190,51 @@ public class CoordinatorCompactionConfigsResource return Response.ok().entity(config).build(); } + @GET + @Path("/{dataSource}/history") + @Produces(MediaType.APPLICATION_JSON) + public Response getCompactionConfigHistory( + @PathParam("dataSource") String dataSource, + @QueryParam("interval") String interval, + @QueryParam("count") Integer count + ) + { + Interval theInterval = interval == null ? null : Intervals.of(interval); + try { + List auditEntries; + if (theInterval == null && count != null) { + auditEntries = auditManager.fetchAuditHistory( + CoordinatorCompactionConfig.CONFIG_KEY, + CoordinatorCompactionConfig.CONFIG_KEY, + count + ); + } else { + auditEntries = auditManager.fetchAuditHistory( + CoordinatorCompactionConfig.CONFIG_KEY, + CoordinatorCompactionConfig.CONFIG_KEY, + theInterval + ); + } + DataSourceCompactionConfigHistory history = new DataSourceCompactionConfigHistory(dataSource); + for (AuditEntry audit : auditEntries) { + CoordinatorCompactionConfig coordinatorCompactionConfig = CoordinatorCompactionConfig.convertByteToConfig( + manager, + audit.getPayload().getBytes(StandardCharsets.UTF_8) + ); + history.add(coordinatorCompactionConfig, audit.getAuditInfo(), audit.getAuditTime()); + } + if (history.getHistory().isEmpty()) { + return Response.status(Response.Status.NOT_FOUND).build(); + } + return Response.ok(history.getHistory()).build(); + } + catch (IllegalArgumentException e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ServletResourceUtils.sanitizeException(e)) + .build(); + } + } + @DELETE @Path("/{dataSource}") @Produces(MediaType.APPLICATION_JSON) @@ -180,7 +247,10 @@ public class CoordinatorCompactionConfigsResource { Callable callable = () -> { final byte[] currentBytes = getCurrentConfigInByteFromDb(); - final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes); + final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig( + manager, + currentBytes + ); final Map configs = current .getCompactionConfigs() .stream() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java new file mode 100644 index 00000000000..365c872165e --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DataSourceCompactionConfigAuditEntryTest +{ + private static final double COMPACTION_TASK_SLOT_RATIO = 0.1; + private static final int MAX_COMPACTION_SLOTS = 9; + private static final boolean USE_AUTO_SCALE_SLOTS = true; + + @Mock + private CoordinatorCompactionConfig coordinatorCompactionConfig; + + @Before + public void setUp() + { + Mockito.when(coordinatorCompactionConfig.getCompactionTaskSlotRatio()).thenReturn(COMPACTION_TASK_SLOT_RATIO); + Mockito.when(coordinatorCompactionConfig.getMaxCompactionTaskSlots()).thenReturn(MAX_COMPACTION_SLOTS); + Mockito.when(coordinatorCompactionConfig.isUseAutoScaleSlots()).thenReturn(USE_AUTO_SCALE_SLOTS); + } + + @Test + public void testhasSameConfigWithSameBaseConfigShouldReturnTrue() + { + DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config = + new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( + COMPACTION_TASK_SLOT_RATIO, + MAX_COMPACTION_SLOTS, + USE_AUTO_SCALE_SLOTS + ); + + Assert.assertTrue(config.hasSameConfig(coordinatorCompactionConfig)); + } + + @Test + public void testhasSameConfigWithDifferentUseAutoScaleSlotsShouldReturnFalse() + { + DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config = + new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( + COMPACTION_TASK_SLOT_RATIO, + MAX_COMPACTION_SLOTS, + !USE_AUTO_SCALE_SLOTS + ); + + Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig)); + } + + @Test + public void testhasSameConfigWithDifferentMaxCompactionSlotsShouldReturnFalse() + { + DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config = + new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( + COMPACTION_TASK_SLOT_RATIO, + MAX_COMPACTION_SLOTS + 1, + USE_AUTO_SCALE_SLOTS + ); + + Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig)); + } + + @Test + public void testhasSameConfigWithDifferentCompactionSlotRatioShouldReturnFalse() + { + DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config = + new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig( + COMPACTION_TASK_SLOT_RATIO - 0.03, + MAX_COMPACTION_SLOTS, + USE_AUTO_SCALE_SLOTS + ); + + Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig)); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java new file mode 100644 index 00000000000..bf4badb44db --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.audit.AuditInfo; +import org.apache.druid.java.util.common.DateTimes; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DataSourceCompactionConfigHistoryTest +{ + private static final String DATASOURCE = "DATASOURCE"; + private static final String DATASOURCE_2 = "DATASOURCE_2"; + private static final String DATASOURCE_NOT_EXISTS = "DATASOURCE_NOT_EXISTS"; + private static final double COMPACTION_TASK_SLOT_RATIO = 0.1; + private static final int MAX_COMPACTION_TASK_SLOTS = 9; + private static final boolean USE_AUTO_SCALE_SLOTS = false; + private static final DateTime AUDIT_TIME = DateTimes.of(2023, 1, 13, 9, 0); + private static final DateTime AUDIT_TIME_2 = DateTimes.of(2023, 1, 13, 9, 30); + private static final DateTime AUDIT_TIME_3 = DateTimes.of(2023, 1, 13, 10, 0); + + @Mock + private CoordinatorCompactionConfig compactionConfig; + @Mock(answer = Answers.RETURNS_MOCKS) + private DataSourceCompactionConfig configForDataSource; + @Mock(answer = Answers.RETURNS_MOCKS) + private DataSourceCompactionConfig configForDataSourceWithChange; + @Mock(answer = Answers.RETURNS_MOCKS) + private DataSourceCompactionConfig configForDataSource2; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private AuditInfo auditInfo; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private AuditInfo auditInfo2; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private AuditInfo auditInfo3; + + private DataSourceCompactionConfigHistory target; + + @Before + public void setUp() + { + Mockito.when(compactionConfig.getCompactionTaskSlotRatio()).thenReturn(COMPACTION_TASK_SLOT_RATIO); + Mockito.when(compactionConfig.getMaxCompactionTaskSlots()).thenReturn(MAX_COMPACTION_TASK_SLOTS); + Mockito.when(compactionConfig.isUseAutoScaleSlots()).thenReturn(USE_AUTO_SCALE_SLOTS); + Mockito.when(configForDataSource.getDataSource()).thenReturn(DATASOURCE); + Mockito.when(configForDataSourceWithChange.getDataSource()).thenReturn(DATASOURCE); + Mockito.when(configForDataSource2.getDataSource()).thenReturn(DATASOURCE_2); + Mockito.when(compactionConfig.getCompactionConfigs()) + .thenReturn(ImmutableList.of(configForDataSource, configForDataSource2)); + target = new DataSourceCompactionConfigHistory(DATASOURCE); + } + + @Test + public void testAddCompactionConfigShouldAddToHistory() + { + target.add(compactionConfig, auditInfo, AUDIT_TIME); + Assert.assertEquals(1, target.getHistory().size()); + DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); + Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); + Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); + } + + @Test + public void testAddAndDeleteCompactionConfigShouldAddBothToHistory() + { + target.add(compactionConfig, auditInfo, AUDIT_TIME); + Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource2)); + target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); + Assert.assertEquals(2, target.getHistory().size()); + DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); + Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); + Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); + auditEntry = target.getHistory().get(1); + Assert.assertEquals(null, auditEntry.getCompactionConfig()); + Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime()); + } + + @Test + public void testAddAndDeleteAnotherCompactionConfigShouldNotAddToHistory() + { + target.add(compactionConfig, auditInfo, AUDIT_TIME); + Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource)); + target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); + Assert.assertEquals(1, target.getHistory().size()); + DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); + Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); + Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); + } + + @Test + public void testAddDeletedAddCompactionConfigShouldAddAllToHistory() + { + target.add(compactionConfig, auditInfo, AUDIT_TIME); + Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource2)); + target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); + Mockito.when(compactionConfig.getCompactionConfigs()) + .thenReturn(ImmutableList.of(configForDataSourceWithChange, configForDataSource2)); + target.add(compactionConfig, auditInfo3, AUDIT_TIME_3); + Assert.assertEquals(3, target.getHistory().size()); + DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); + Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); + Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); + auditEntry = target.getHistory().get(2); + Assert.assertEquals(configForDataSourceWithChange, auditEntry.getCompactionConfig()); + Assert.assertEquals(auditInfo3, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME_3, auditEntry.getAuditTime()); + } + + @Test + public void testAddAndChangeCompactionConfigShouldAddBothToHistory() + { + target.add(compactionConfig, auditInfo, AUDIT_TIME); + Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSourceWithChange)); + target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); + Assert.assertEquals(2, target.getHistory().size()); + DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); + Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); + Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); + auditEntry = target.getHistory().get(1); + Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); + Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime()); + } + + @Test + public void testAddAndChangeGlobalSettingsShouldAddTwice() + { + target.add(compactionConfig, auditInfo, AUDIT_TIME); + int newMaxTaskSlots = MAX_COMPACTION_TASK_SLOTS - 1; + Mockito.when(compactionConfig.getMaxCompactionTaskSlots()).thenReturn(newMaxTaskSlots); + target.add(compactionConfig, auditInfo2, AUDIT_TIME_2); + Assert.assertEquals(2, target.getHistory().size()); + DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0); + Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); + Assert.assertEquals(auditInfo, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime()); + Assert.assertEquals(MAX_COMPACTION_TASK_SLOTS, auditEntry.getGlobalConfig().getMaxCompactionTaskSlots()); + auditEntry = target.getHistory().get(1); + Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource()); + Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo()); + Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime()); + Assert.assertEquals(newMaxTaskSlots, auditEntry.getGlobalConfig().getMaxCompactionTaskSlots()); + } + + @Test + public void testAddCompactionConfigDoesNotHaveDataSourceWithNoHistoryShouldNotAdd() + { + target = new DataSourceCompactionConfigHistory(DATASOURCE_NOT_EXISTS); + target.add(compactionConfig, auditInfo, AUDIT_TIME); + Assert.assertTrue(target.getHistory().isEmpty()); + } + +} diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index 6a38d5bcc65..5159d86594e 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -22,8 +22,10 @@ package org.apache.druid.server.http; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; @@ -64,7 +66,10 @@ public class CoordinatorCompactionConfigsResourceTest ); private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3}; - private static final CoordinatorCompactionConfig ORIGINAL_CONFIG = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG)); + private static final CoordinatorCompactionConfig ORIGINAL_CONFIG = CoordinatorCompactionConfig.from(ImmutableList.of( + OLD_CONFIG)); + + private static final String DATASOURCE_NOT_EXISTS = "notExists"; @Mock private JacksonConfigManager mockJacksonConfigManager; @@ -78,27 +83,40 @@ public class CoordinatorCompactionConfigsResourceTest @Mock private MetadataStorageTablesConfig mockConnectorConfig; + @Mock + private AuditManager mockAuditManager; + private CoordinatorCompactionConfigsResource coordinatorCompactionConfigsResource; @Before public void setup() { Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ) ).thenReturn(OLD_CONFIG_IN_BYTES); Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) + ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES), + ArgumentMatchers.eq(CoordinatorCompactionConfig.class), + ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) + ) ).thenReturn(ORIGINAL_CONFIG); Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config"); + Mockito.when(mockAuditManager.fetchAuditHistory( + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + ArgumentMatchers.any() + ) + ).thenReturn(ImmutableList.of()); coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource( mockJacksonConfigManager, mockConnector, - mockConnectorConfig + mockConnectorConfig, + mockAuditManager, + new DefaultObjectMapper() ); Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123"); } @@ -107,12 +125,14 @@ public class CoordinatorCompactionConfigsResourceTest public void testSetCompactionTaskLimitWithExistingConfig() { final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( + CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any()) + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + oldConfigCaptor.capture(), + newConfigCaptor.capture(), + ArgumentMatchers.any() + ) ).thenReturn(ConfigManager.SetResult.ok()); double compactionTaskSlotRatio = 0.5; @@ -140,12 +160,14 @@ public class CoordinatorCompactionConfigsResourceTest public void testAddOrUpdateCompactionConfigWithExistingConfig() { final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( + CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any()) + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + oldConfigCaptor.capture(), + newConfigCaptor.capture(), + ArgumentMatchers.any() + ) ).thenReturn(ConfigManager.SetResult.ok()); final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig( @@ -183,12 +205,14 @@ public class CoordinatorCompactionConfigsResourceTest public void testDeleteCompactionConfigWithExistingConfig() { final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( + CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any()) + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + oldConfigCaptor.capture(), + newConfigCaptor.capture(), + ArgumentMatchers.any() + ) ).thenReturn(ConfigManager.SetResult.ok()); final String datasourceName = "dataSource"; final DataSourceCompactionConfig toDelete = new DataSourceCompactionConfig( @@ -207,9 +231,10 @@ public class CoordinatorCompactionConfigsResourceTest ); final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete)); Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) + ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES), + ArgumentMatchers.eq(CoordinatorCompactionConfig.class), + ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) + ) ).thenReturn(originalConfig); String author = "maytas"; @@ -255,23 +280,27 @@ public class CoordinatorCompactionConfigsResourceTest public void testSetCompactionTaskLimitWithoutExistingConfig() { Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ) ).thenReturn(null); Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) + ArgumentMatchers.eq(null), + ArgumentMatchers.eq(CoordinatorCompactionConfig.class), + ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) + ) ).thenReturn(CoordinatorCompactionConfig.empty()); final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( + CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any()) + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + oldConfigCaptor.capture(), + newConfigCaptor.capture(), + ArgumentMatchers.any() + ) ).thenReturn(ConfigManager.SetResult.ok()); double compactionTaskSlotRatio = 0.5; @@ -298,23 +327,27 @@ public class CoordinatorCompactionConfigsResourceTest public void testAddOrUpdateCompactionConfigWithoutExistingConfig() { Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ) ).thenReturn(null); Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) + ArgumentMatchers.eq(null), + ArgumentMatchers.eq(CoordinatorCompactionConfig.class), + ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) + ) ).thenReturn(CoordinatorCompactionConfig.empty()); final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); - final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass( + CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - oldConfigCaptor.capture(), - newConfigCaptor.capture(), - ArgumentMatchers.any()) + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + oldConfigCaptor.capture(), + newConfigCaptor.capture(), + ArgumentMatchers.any() + ) ).thenReturn(ConfigManager.SetResult.ok()); final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig( @@ -350,24 +383,37 @@ public class CoordinatorCompactionConfigsResourceTest public void testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist() { Mockito.when(mockConnector.lookup( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq("name"), - ArgumentMatchers.eq("payload"), - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ) ).thenReturn(null); Mockito.when(mockJacksonConfigManager.convertByteToConfig( - ArgumentMatchers.eq(null), - ArgumentMatchers.eq(CoordinatorCompactionConfig.class), - ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) + ArgumentMatchers.eq(null), + ArgumentMatchers.eq(CoordinatorCompactionConfig.class), + ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) + ) ).thenReturn(CoordinatorCompactionConfig.empty()); String author = "maytas"; String comment = "hello"; Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig( - "notExist", + DATASOURCE_NOT_EXISTS, author, comment, mockHttpServletRequest ); Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), result.getStatus()); } + + @Test + public void testGetCompactionConfigHistoryForUnknownDataSourceShouldReturnNotFound() + { + Response response = coordinatorCompactionConfigsResource.getCompactionConfigHistory( + DATASOURCE_NOT_EXISTS, + null, + null + ); + Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus()); + } }