Add API to return automatic compaction config history (#13699)

Add a new API to return the history of changes to automatic compaction config history to make it easy for users to see what changes have been made to their auto-compaction config.

The API is scoped per dataSource to allow users to triage issues with an individual dataSource. The API responds with a list of configs when there is a change to either the settings that impact all auto-compaction configs on a cluster or the dataSource in question.
This commit is contained in:
Suneet Saldanha 2023-01-23 13:23:45 -08:00 committed by GitHub
parent 90d445536d
commit 016c881795
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 694 additions and 63 deletions

View File

@ -470,6 +470,18 @@ Returns all automatic compaction configs.
Returns an automatic compaction config of a dataSource.
`GET /druid/coordinator/v1/config/compaction/{dataSource}/history?interval={interval}&count={count}`
Returns the history of the automatic compaction config for a dataSource. Optionally accepts `interval` and `count`
query string parameters to filter by interval and limit the number of results respectively. If the dataSource does not
exist or there is no compaction history for the dataSource, a 404 response is returned.
The response contains a list of objects with the following keys:
* `globalConfig`: A json object containing automatic compaction config that applies to the entire cluster.
* `compactionConfig`: A json object containing the automatic compaction config for the datasource.
* `auditInfo`: A json object that contains information about the change made - like `author`, `comment` and `ip`.
* `auditTime`: The date and time when the change was made.
`POST /druid/coordinator/v1/config/compaction/taskslots?ratio={someRatio}&max={someMaxSlots}`
Update the capacity for compaction tasks. `ratio` and `max` are used to limit the max number of compaction tasks.

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.audit.AuditInfo;
import org.joda.time.DateTime;
/**
* A DTO containing audit information for compaction config for a datasource.
*/
public class DataSourceCompactionConfigAuditEntry
{
private final GlobalCompactionConfig globalConfig;
private final DataSourceCompactionConfig compactionConfig;
private final AuditInfo auditInfo;
private final DateTime auditTime;
@JsonCreator
public DataSourceCompactionConfigAuditEntry(
@JsonProperty("globalConfig") GlobalCompactionConfig globalConfig,
@JsonProperty("compactionConfig") DataSourceCompactionConfig compactionConfig,
@JsonProperty("auditInfo") AuditInfo auditInfo,
@JsonProperty("auditTime") DateTime auditTime
)
{
this.globalConfig = globalConfig;
this.compactionConfig = compactionConfig;
this.auditInfo = auditInfo;
this.auditTime = auditTime;
}
@JsonProperty
public GlobalCompactionConfig getGlobalConfig()
{
return globalConfig;
}
@JsonProperty
public DataSourceCompactionConfig getCompactionConfig()
{
return compactionConfig;
}
@JsonProperty
public AuditInfo getAuditInfo()
{
return auditInfo;
}
@JsonProperty
public DateTime getAuditTime()
{
return auditTime;
}
/**
* A DTO containing compaction config for that affects the entire cluster.
*/
public static class GlobalCompactionConfig
{
private final double compactionTaskSlotRatio;
private final int maxCompactionTaskSlots;
private final boolean useAutoScaleSlots;
@JsonCreator
public GlobalCompactionConfig(
@JsonProperty("compactionTaskSlotRatio")
double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") int maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") boolean useAutoScaleSlots
)
{
this.compactionTaskSlotRatio = compactionTaskSlotRatio;
this.maxCompactionTaskSlots = maxCompactionTaskSlots;
this.useAutoScaleSlots = useAutoScaleSlots;
}
@JsonProperty
public double getCompactionTaskSlotRatio()
{
return compactionTaskSlotRatio;
}
@JsonProperty
public int getMaxCompactionTaskSlots()
{
return maxCompactionTaskSlots;
}
@JsonProperty
public boolean isUseAutoScaleSlots()
{
return useAutoScaleSlots;
}
@JsonIgnore
public boolean hasSameConfig(CoordinatorCompactionConfig coordinatorCompactionConfig)
{
return useAutoScaleSlots == coordinatorCompactionConfig.isUseAutoScaleSlots() &&
compactionTaskSlotRatio == coordinatorCompactionConfig.getCompactionTaskSlotRatio() &&
coordinatorCompactionConfig.getMaxCompactionTaskSlots() == maxCompactionTaskSlots;
}
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import org.apache.druid.audit.AuditInfo;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Stack;
/**
* A utility class to build the config history for a datasource from audit entries for
* {@link CoordinatorCompactionConfig}. The {@link CoordinatorCompactionConfig} contains the entire config for the
* cluster, so this class creates adds audit entires to the history only when a setting for this datasource or a global
* setting has changed.
*/
public class DataSourceCompactionConfigHistory
{
private final Stack<DataSourceCompactionConfigAuditEntry> auditEntries = new Stack<>();
private final String dataSource;
public DataSourceCompactionConfigHistory(String dataSource)
{
this.dataSource = dataSource;
}
public void add(CoordinatorCompactionConfig coordinatorCompactionConfig, AuditInfo auditInfo, DateTime auditTime)
{
DataSourceCompactionConfigAuditEntry current = auditEntries.isEmpty() ? null : auditEntries.peek();
DataSourceCompactionConfigAuditEntry newEntry = null;
boolean hasDataSourceCompactionConfig = false;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSource.equals(dataSourceCompactionConfig.getDataSource())) {
hasDataSourceCompactionConfig = true;
if (
current == null ||
(
!dataSourceCompactionConfig.equals(current.getCompactionConfig()) ||
!current.getGlobalConfig().hasSameConfig(coordinatorCompactionConfig)
)
) {
current = new DataSourceCompactionConfigAuditEntry(
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
coordinatorCompactionConfig.getCompactionTaskSlotRatio(),
coordinatorCompactionConfig.getMaxCompactionTaskSlots(),
coordinatorCompactionConfig.isUseAutoScaleSlots()
),
dataSourceCompactionConfig,
auditInfo,
auditTime
);
newEntry = current;
}
break;
}
}
if (newEntry != null) {
auditEntries.push(newEntry);
} else if (current != null && !hasDataSourceCompactionConfig) {
newEntry = new DataSourceCompactionConfigAuditEntry(
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
coordinatorCompactionConfig.getCompactionTaskSlotRatio(),
coordinatorCompactionConfig.getMaxCompactionTaskSlots(),
coordinatorCompactionConfig.isUseAutoScaleSlots()
),
null,
auditInfo,
auditTime
);
auditEntries.push(newEntry);
}
}
public List<DataSourceCompactionConfigAuditEntry> getHistory()
{
return auditEntries;
}
}

View File

@ -19,21 +19,28 @@
package org.apache.druid.server.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.common.utils.ServletResourceUtils;
import org.apache.druid.guice.annotations.JsonNonNull;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.joda.time.Interval;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
@ -49,6 +56,9 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
@ -67,17 +77,23 @@ public class CoordinatorCompactionConfigsResource
private final JacksonConfigManager manager;
private final MetadataStorageConnector connector;
private final MetadataStorageTablesConfig connectorConfig;
private final AuditManager auditManager;
private final ObjectMapper jsonMapperOnlyNonNullValue;
@Inject
public CoordinatorCompactionConfigsResource(
JacksonConfigManager manager,
MetadataStorageConnector connector,
MetadataStorageTablesConfig connectorConfig
MetadataStorageTablesConfig connectorConfig,
AuditManager auditManager,
@JsonNonNull ObjectMapper jsonMapperOnlyNonNullValue
)
{
this.manager = manager;
this.connector = connector;
this.connectorConfig = connectorConfig;
this.auditManager = auditManager;
this.jsonMapperOnlyNonNullValue = jsonMapperOnlyNonNullValue;
}
@GET
@ -101,7 +117,10 @@ public class CoordinatorCompactionConfigsResource
{
Callable<SetResult> callable = () -> {
final byte[] currentBytes = getCurrentConfigInByteFromDb();
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(
manager,
currentBytes
);
final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from(
current,
compactionTaskSlotRatio,
@ -130,7 +149,10 @@ public class CoordinatorCompactionConfigsResource
{
Callable<SetResult> callable = () -> {
final byte[] currentBytes = getCurrentConfigInByteFromDb();
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(
manager,
currentBytes
);
final CoordinatorCompactionConfig newCompactionConfig;
final Map<String, DataSourceCompactionConfig> newConfigs = current
.getCompactionConfigs()
@ -168,6 +190,51 @@ public class CoordinatorCompactionConfigsResource
return Response.ok().entity(config).build();
}
@GET
@Path("/{dataSource}/history")
@Produces(MediaType.APPLICATION_JSON)
public Response getCompactionConfigHistory(
@PathParam("dataSource") String dataSource,
@QueryParam("interval") String interval,
@QueryParam("count") Integer count
)
{
Interval theInterval = interval == null ? null : Intervals.of(interval);
try {
List<AuditEntry> auditEntries;
if (theInterval == null && count != null) {
auditEntries = auditManager.fetchAuditHistory(
CoordinatorCompactionConfig.CONFIG_KEY,
CoordinatorCompactionConfig.CONFIG_KEY,
count
);
} else {
auditEntries = auditManager.fetchAuditHistory(
CoordinatorCompactionConfig.CONFIG_KEY,
CoordinatorCompactionConfig.CONFIG_KEY,
theInterval
);
}
DataSourceCompactionConfigHistory history = new DataSourceCompactionConfigHistory(dataSource);
for (AuditEntry audit : auditEntries) {
CoordinatorCompactionConfig coordinatorCompactionConfig = CoordinatorCompactionConfig.convertByteToConfig(
manager,
audit.getPayload().getBytes(StandardCharsets.UTF_8)
);
history.add(coordinatorCompactionConfig, audit.getAuditInfo(), audit.getAuditTime());
}
if (history.getHistory().isEmpty()) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.ok(history.getHistory()).build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(e))
.build();
}
}
@DELETE
@Path("/{dataSource}")
@Produces(MediaType.APPLICATION_JSON)
@ -180,7 +247,10 @@ public class CoordinatorCompactionConfigsResource
{
Callable<SetResult> callable = () -> {
final byte[] currentBytes = getCurrentConfigInByteFromDb();
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(
manager,
currentBytes
);
final Map<String, DataSourceCompactionConfig> configs = current
.getCompactionConfigs()
.stream()

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class DataSourceCompactionConfigAuditEntryTest
{
private static final double COMPACTION_TASK_SLOT_RATIO = 0.1;
private static final int MAX_COMPACTION_SLOTS = 9;
private static final boolean USE_AUTO_SCALE_SLOTS = true;
@Mock
private CoordinatorCompactionConfig coordinatorCompactionConfig;
@Before
public void setUp()
{
Mockito.when(coordinatorCompactionConfig.getCompactionTaskSlotRatio()).thenReturn(COMPACTION_TASK_SLOT_RATIO);
Mockito.when(coordinatorCompactionConfig.getMaxCompactionTaskSlots()).thenReturn(MAX_COMPACTION_SLOTS);
Mockito.when(coordinatorCompactionConfig.isUseAutoScaleSlots()).thenReturn(USE_AUTO_SCALE_SLOTS);
}
@Test
public void testhasSameConfigWithSameBaseConfigShouldReturnTrue()
{
DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config =
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
COMPACTION_TASK_SLOT_RATIO,
MAX_COMPACTION_SLOTS,
USE_AUTO_SCALE_SLOTS
);
Assert.assertTrue(config.hasSameConfig(coordinatorCompactionConfig));
}
@Test
public void testhasSameConfigWithDifferentUseAutoScaleSlotsShouldReturnFalse()
{
DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config =
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
COMPACTION_TASK_SLOT_RATIO,
MAX_COMPACTION_SLOTS,
!USE_AUTO_SCALE_SLOTS
);
Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig));
}
@Test
public void testhasSameConfigWithDifferentMaxCompactionSlotsShouldReturnFalse()
{
DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config =
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
COMPACTION_TASK_SLOT_RATIO,
MAX_COMPACTION_SLOTS + 1,
USE_AUTO_SCALE_SLOTS
);
Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig));
}
@Test
public void testhasSameConfigWithDifferentCompactionSlotRatioShouldReturnFalse()
{
DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig config =
new DataSourceCompactionConfigAuditEntry.GlobalCompactionConfig(
COMPACTION_TASK_SLOT_RATIO - 0.03,
MAX_COMPACTION_SLOTS,
USE_AUTO_SCALE_SLOTS
);
Assert.assertFalse(config.hasSameConfig(coordinatorCompactionConfig));
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class DataSourceCompactionConfigHistoryTest
{
private static final String DATASOURCE = "DATASOURCE";
private static final String DATASOURCE_2 = "DATASOURCE_2";
private static final String DATASOURCE_NOT_EXISTS = "DATASOURCE_NOT_EXISTS";
private static final double COMPACTION_TASK_SLOT_RATIO = 0.1;
private static final int MAX_COMPACTION_TASK_SLOTS = 9;
private static final boolean USE_AUTO_SCALE_SLOTS = false;
private static final DateTime AUDIT_TIME = DateTimes.of(2023, 1, 13, 9, 0);
private static final DateTime AUDIT_TIME_2 = DateTimes.of(2023, 1, 13, 9, 30);
private static final DateTime AUDIT_TIME_3 = DateTimes.of(2023, 1, 13, 10, 0);
@Mock
private CoordinatorCompactionConfig compactionConfig;
@Mock(answer = Answers.RETURNS_MOCKS)
private DataSourceCompactionConfig configForDataSource;
@Mock(answer = Answers.RETURNS_MOCKS)
private DataSourceCompactionConfig configForDataSourceWithChange;
@Mock(answer = Answers.RETURNS_MOCKS)
private DataSourceCompactionConfig configForDataSource2;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private AuditInfo auditInfo;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private AuditInfo auditInfo2;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private AuditInfo auditInfo3;
private DataSourceCompactionConfigHistory target;
@Before
public void setUp()
{
Mockito.when(compactionConfig.getCompactionTaskSlotRatio()).thenReturn(COMPACTION_TASK_SLOT_RATIO);
Mockito.when(compactionConfig.getMaxCompactionTaskSlots()).thenReturn(MAX_COMPACTION_TASK_SLOTS);
Mockito.when(compactionConfig.isUseAutoScaleSlots()).thenReturn(USE_AUTO_SCALE_SLOTS);
Mockito.when(configForDataSource.getDataSource()).thenReturn(DATASOURCE);
Mockito.when(configForDataSourceWithChange.getDataSource()).thenReturn(DATASOURCE);
Mockito.when(configForDataSource2.getDataSource()).thenReturn(DATASOURCE_2);
Mockito.when(compactionConfig.getCompactionConfigs())
.thenReturn(ImmutableList.of(configForDataSource, configForDataSource2));
target = new DataSourceCompactionConfigHistory(DATASOURCE);
}
@Test
public void testAddCompactionConfigShouldAddToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Assert.assertEquals(1, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
}
@Test
public void testAddAndDeleteCompactionConfigShouldAddBothToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource2));
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Assert.assertEquals(2, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
auditEntry = target.getHistory().get(1);
Assert.assertEquals(null, auditEntry.getCompactionConfig());
Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime());
}
@Test
public void testAddAndDeleteAnotherCompactionConfigShouldNotAddToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource));
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Assert.assertEquals(1, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
}
@Test
public void testAddDeletedAddCompactionConfigShouldAddAllToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSource2));
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Mockito.when(compactionConfig.getCompactionConfigs())
.thenReturn(ImmutableList.of(configForDataSourceWithChange, configForDataSource2));
target.add(compactionConfig, auditInfo3, AUDIT_TIME_3);
Assert.assertEquals(3, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
auditEntry = target.getHistory().get(2);
Assert.assertEquals(configForDataSourceWithChange, auditEntry.getCompactionConfig());
Assert.assertEquals(auditInfo3, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME_3, auditEntry.getAuditTime());
}
@Test
public void testAddAndChangeCompactionConfigShouldAddBothToHistory()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Mockito.when(compactionConfig.getCompactionConfigs()).thenReturn(ImmutableList.of(configForDataSourceWithChange));
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Assert.assertEquals(2, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
auditEntry = target.getHistory().get(1);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime());
}
@Test
public void testAddAndChangeGlobalSettingsShouldAddTwice()
{
target.add(compactionConfig, auditInfo, AUDIT_TIME);
int newMaxTaskSlots = MAX_COMPACTION_TASK_SLOTS - 1;
Mockito.when(compactionConfig.getMaxCompactionTaskSlots()).thenReturn(newMaxTaskSlots);
target.add(compactionConfig, auditInfo2, AUDIT_TIME_2);
Assert.assertEquals(2, target.getHistory().size());
DataSourceCompactionConfigAuditEntry auditEntry = target.getHistory().get(0);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME, auditEntry.getAuditTime());
Assert.assertEquals(MAX_COMPACTION_TASK_SLOTS, auditEntry.getGlobalConfig().getMaxCompactionTaskSlots());
auditEntry = target.getHistory().get(1);
Assert.assertEquals(DATASOURCE, auditEntry.getCompactionConfig().getDataSource());
Assert.assertEquals(auditInfo2, auditEntry.getAuditInfo());
Assert.assertEquals(AUDIT_TIME_2, auditEntry.getAuditTime());
Assert.assertEquals(newMaxTaskSlots, auditEntry.getGlobalConfig().getMaxCompactionTaskSlots());
}
@Test
public void testAddCompactionConfigDoesNotHaveDataSourceWithNoHistoryShouldNotAdd()
{
target = new DataSourceCompactionConfigHistory(DATASOURCE_NOT_EXISTS);
target.add(compactionConfig, auditInfo, AUDIT_TIME);
Assert.assertTrue(target.getHistory().isEmpty());
}
}

View File

@ -22,8 +22,10 @@ package org.apache.druid.server.http;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
@ -64,7 +66,10 @@ public class CoordinatorCompactionConfigsResourceTest
);
private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3};
private static final CoordinatorCompactionConfig ORIGINAL_CONFIG = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG));
private static final CoordinatorCompactionConfig ORIGINAL_CONFIG = CoordinatorCompactionConfig.from(ImmutableList.of(
OLD_CONFIG));
private static final String DATASOURCE_NOT_EXISTS = "notExists";
@Mock
private JacksonConfigManager mockJacksonConfigManager;
@ -78,27 +83,40 @@ public class CoordinatorCompactionConfigsResourceTest
@Mock
private MetadataStorageTablesConfig mockConnectorConfig;
@Mock
private AuditManager mockAuditManager;
private CoordinatorCompactionConfigsResource coordinatorCompactionConfigsResource;
@Before
public void setup()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(OLD_CONFIG_IN_BYTES);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(ORIGINAL_CONFIG);
Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
Mockito.when(mockAuditManager.fetchAuditHistory(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.any()
)
).thenReturn(ImmutableList.of());
coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(
mockJacksonConfigManager,
mockConnector,
mockConnectorConfig
mockConnectorConfig,
mockAuditManager,
new DefaultObjectMapper()
);
Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123");
}
@ -107,12 +125,14 @@ public class CoordinatorCompactionConfigsResourceTest
public void testSetCompactionTaskLimitWithExistingConfig()
{
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
double compactionTaskSlotRatio = 0.5;
@ -140,12 +160,14 @@ public class CoordinatorCompactionConfigsResourceTest
public void testAddOrUpdateCompactionConfigWithExistingConfig()
{
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig(
@ -183,12 +205,14 @@ public class CoordinatorCompactionConfigsResourceTest
public void testDeleteCompactionConfigWithExistingConfig()
{
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
final String datasourceName = "dataSource";
final DataSourceCompactionConfig toDelete = new DataSourceCompactionConfig(
@ -207,9 +231,10 @@ public class CoordinatorCompactionConfigsResourceTest
);
final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(originalConfig);
String author = "maytas";
@ -255,23 +280,27 @@ public class CoordinatorCompactionConfigsResourceTest
public void testSetCompactionTaskLimitWithoutExistingConfig()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(CoordinatorCompactionConfig.empty());
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
double compactionTaskSlotRatio = 0.5;
@ -298,23 +327,27 @@ public class CoordinatorCompactionConfigsResourceTest
public void testAddOrUpdateCompactionConfigWithoutExistingConfig()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(CoordinatorCompactionConfig.empty());
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(
CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any()
)
).thenReturn(ConfigManager.SetResult.ok());
final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig(
@ -350,24 +383,37 @@ public class CoordinatorCompactionConfigsResourceTest
public void testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist()
{
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
)
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
)
).thenReturn(CoordinatorCompactionConfig.empty());
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig(
"notExist",
DATASOURCE_NOT_EXISTS,
author,
comment,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), result.getStatus());
}
@Test
public void testGetCompactionConfigHistoryForUnknownDataSourceShouldReturnNotFound()
{
Response response = coordinatorCompactionConfigsResource.getCompactionConfigHistory(
DATASOURCE_NOT_EXISTS,
null,
null
);
Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus());
}
}