Fix bug: 502 bad gateway thrown when we edit/delete any auto compaction config created 0.21.0 or before (#11311)

* fix bug

* add test

* fix IT

* fix checkstyle

* address comments
This commit is contained in:
Maytas Monsereenusorn 2021-05-27 16:34:32 -07:00 committed by GitHub
parent 60843bd11f
commit e5633d7842
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 480 additions and 85 deletions

View File

@ -592,13 +592,13 @@ jobs:
stage: Tests - phase 2
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests"
@ -671,7 +671,7 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests"

View File

@ -176,7 +176,7 @@ public class ConfigManager
return set(key, serde, null, obj);
}
public <T> SetResult set(final String key, final ConfigSerde<T> serde, @Nullable final T oldObject, final T newObject)
public <T> SetResult set(final String key, final ConfigSerde<T> serde, @Nullable final byte[] oldValue, final T newObject)
{
if (newObject == null || !started) {
if (newObject == null) {
@ -191,11 +191,10 @@ public class ConfigManager
try {
return exec.submit(
() -> {
if (oldObject == null) {
if (oldValue == null || !config.get().isEnableCompareAndSwap()) {
dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
} else {
final byte[] oldBytes = serde.serialize(oldObject);
MetadataCASUpdate metadataCASUpdate = createMetadataCASUpdate(key, oldBytes, newBytes);
MetadataCASUpdate metadataCASUpdate = createMetadataCASUpdate(key, oldValue, newBytes);
boolean success = dbConnector.compareAndSwap(ImmutableList.of(metadataCASUpdate));
if (!success) {
return SetResult.fail(new IllegalStateException("Config value has changed"), true);

View File

@ -32,8 +32,16 @@ public class ConfigManagerConfig
@NotNull
private Period pollDuration = new Period("PT1M");
@JsonProperty
private boolean enableCompareAndSwap = true;
public Period getPollDuration()
{
return pollDuration;
}
public boolean isEnableCompareAndSwap()
{
return enableCompareAndSwap;
}
}

View File

@ -73,6 +73,16 @@ public class JacksonConfigManager
return configManager.watchConfig(key, create(clazz, defaultVal));
}
public <T> T convertByteToConfig(byte[] configInByte, Class<? extends T> clazz, T defaultVal)
{
if (configInByte == null) {
return defaultVal;
} else {
final ConfigSerde<T> serde = create(clazz, defaultVal);
return serde.deserialize(configInByte);
}
}
/**
* Set the config and add audit entry
*
@ -90,14 +100,18 @@ public class JacksonConfigManager
*
* @param key of the config to set
* @param oldValue old config value. If not null, then the update will only succeed if the insert
* happens when current database entry is the same as this value. If null, then the insert
* will not consider the current database entry.
* happens when current database entry is the same as this value. Note that the current database
* entry (in array of bytes) have to be exactly the same as the array of bytes of this value for
* update to succeed. If null, then the insert will not consider the current database entry. Note
* that this field intentionally uses byte array to be resilient across serde of existing data
* retrieved from the database (instead of Java object which may have additional fields added
* as a result of serde)
* @param newValue new config value to insert
* @param auditInfo metadata regarding the change to config, for audit purposes
*/
public <T> SetResult set(
String key,
@Nullable T oldValue,
@Nullable byte[] oldValue,
T newValue,
AuditInfo auditInfo
)

View File

@ -49,7 +49,7 @@ public class ConfigManagerTest
{
private static final String CONFIG_KEY = "configX";
private static final String TABLE_NAME = "config_table";
private static final TestConfig OLD_CONFIG = new TestConfig("1", "x", 1);
private static final byte[] OLD_CONFIG = {1, 2, 3};
private static final TestConfig NEW_CONFIG = new TestConfig("2", "y", 2);
@Mock
@ -120,6 +120,7 @@ public class ConfigManagerTest
@Test
public void testSetOldObjectNotNullShouldSwap()
{
when(mockConfigManagerConfig.isEnableCompareAndSwap()).thenReturn(true);
when(mockDbConnector.compareAndSwap(any(List.class))).thenReturn(true);
final ArgumentCaptor<List<MetadataCASUpdate>> updateCaptor = ArgumentCaptor.forClass(List.class);
configManager.start();
@ -134,10 +135,27 @@ public class ConfigManagerTest
Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_KEY_COLUMN, updateCaptor.getValue().get(0).getKeyColumn());
Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN, updateCaptor.getValue().get(0).getValueColumn());
Assert.assertEquals(CONFIG_KEY, updateCaptor.getValue().get(0).getKey());
Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(OLD_CONFIG), updateCaptor.getValue().get(0).getOldValue());
Assert.assertArrayEquals(OLD_CONFIG, updateCaptor.getValue().get(0).getOldValue());
Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(NEW_CONFIG), updateCaptor.getValue().get(0).getNewValue());
}
@Test
public void testSetOldObjectNotNullButCompareAndSwapDisabledShouldInsertWithoutSwap()
{
when(mockConfigManagerConfig.isEnableCompareAndSwap()).thenReturn(false);
configManager.start();
ConfigManager.SetResult setResult = configManager.set(CONFIG_KEY, configConfigSerdeFromClass, OLD_CONFIG, NEW_CONFIG);
Assert.assertTrue(setResult.isOk());
Mockito.verify(mockDbConnector).insertOrUpdate(
ArgumentMatchers.eq(TABLE_NAME),
ArgumentMatchers.anyString(),
ArgumentMatchers.anyString(),
ArgumentMatchers.eq(CONFIG_KEY),
ArgumentMatchers.any(byte[].class)
);
Mockito.verifyNoMoreInteractions(mockDbConnector);
}
static class TestConfig
{
private final String version;

View File

@ -38,6 +38,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Objects;
@RunWith(MockitoJUnitRunner.class)
public class JacksonConfigManagerTest
{
@ -136,6 +138,27 @@ public class JacksonConfigManagerTest
Assert.assertNotNull(configSerdeCapture.getValue());
}
@Test
public void testConvertByteToConfigWithNullConfigInByte()
{
TestConfig defaultExpected = new TestConfig("version", null, 3);
TestConfig actual = jacksonConfigManager.convertByteToConfig(null, TestConfig.class, defaultExpected);
Assert.assertEquals(defaultExpected, actual);
}
@Test
public void testConvertByteToConfigWithNonNullConfigInByte()
{
ConfigSerde<TestConfig> configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference<TestConfig>()
{
}, null);
TestConfig defaultConfig = new TestConfig("version", null, 3);
TestConfig expectedConfig = new TestConfig("version2", null, 5);
byte[] expectedConfigInByte = configConfigSerdeFromTypeReference.serialize(expectedConfig);
TestConfig actual = jacksonConfigManager.convertByteToConfig(expectedConfigInByte, TestConfig.class, defaultConfig);
Assert.assertEquals(expectedConfig, actual);
}
static class TestConfig
{
@ -169,6 +192,27 @@ public class JacksonConfigManagerTest
{
return settingInt;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestConfig config = (TestConfig) o;
return settingInt == config.settingInt &&
Objects.equals(version, config.version) &&
Objects.equals(settingString, config.settingString);
}
@Override
public int hashCode()
{
return Objects.hash(version, settingString, settingInt);
}
}
static class ClassThatJacksonCannotSerialize

View File

@ -85,7 +85,7 @@ setupData()
# The "query" and "security" test groups require data to be setup before running the tests.
# In particular, they requires segments to be download from a pre-existing s3 bucket.
# This is done by using the loadSpec put into metadatastore and s3 credientials set below.
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ]; then
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ]; then
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
&& cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid && /etc/init.d/mysql stop

View File

@ -0,0 +1,16 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
INSERT INTO druid_config (name, payload) VALUES ('coordinator.compaction.config', '{"compactionConfigs":[{"dataSource":"upgradeTest","taskPriority":25,"inputSegmentSizeBytes":419430400,"maxRowsPerSegment":null,"skipOffsetFromLatest":"P1D","tuningConfig":{"maxRowsInMemory":null,"maxBytesInMemory":null,"maxTotalRows":null,"splitHintSpec":null,"partitionsSpec":{"type":"hashed","numShards":null,"partitionDimensions":[],"partitionFunction":"murmur3_32_abs","maxRowsPerSegment":5000000},"indexSpec":null,"indexSpecForIntermediatePersists":null,"maxPendingPersists":null,"pushTimeout":null,"segmentWriteOutMediumFactory":null,"maxNumConcurrentSubTasks":null,"maxRetry":null,"taskStatusCheckPeriodMs":null,"chatHandlerTimeout":null,"chatHandlerNumRetries":null,"maxNumSegmentsToMerge":null,"totalNumMergeTasks":null,"forceGuaranteedRollup":true,"type":"index_parallel"},"taskContext":null}],"compactionTaskSlotRatio":0.1,"maxCompactionTaskSlots":2147483647}');

View File

@ -43,6 +43,8 @@ public class TestNGGroup
public static final String COMPACTION = "compaction";
public static final String UPGRADE = "upgrade";
public static final String APPEND_INGESTION = "append-ingestion";
public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.coordinator.duty;
import com.google.inject.Inject;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.joda.time.Period;
import org.testng.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@Test(groups = {TestNGGroup.UPGRADE})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITAutoCompactionUpgradeTest.class);
private static final String UPGRADE_DATASOURCE_NAME = "upgradeTest";
@Inject
protected CompactionResourceTestClient compactionResource;
@Inject
private IntegrationTestingConfig config;
@Test
public void testUpgradeAutoCompactionConfigurationWhenConfigurationFromOlderVersionAlreadyExist() throws Exception
{
// Verify that compaction config already exist. This config was inserted manually into the database using SQL script.
// This auto compaction configuration payload is from Druid 0.21.0
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
DataSourceCompactionConfig foundDataSourceCompactionConfig = null;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) {
foundDataSourceCompactionConfig = dataSourceCompactionConfig;
}
}
Assert.assertNotNull(foundDataSourceCompactionConfig);
// Now submit a new auto compaction configuration
PartitionsSpec newPartitionsSpec = new DynamicPartitionsSpec(4000, null);
Period newSkipOffset = Period.seconds(0);
DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(
UPGRADE_DATASOURCE_NAME,
null,
null,
null,
newSkipOffset,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
new MaxSizeSplitHintSpec(null, 1),
newPartitionsSpec,
null,
null,
null,
null,
null,
1,
null,
null,
null,
null,
null,
1
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
new UserCompactionTaskIOConfig(true),
null
);
compactionResource.submitCompactionConfig(compactionConfig);
// Wait for compaction config to persist
Thread.sleep(2000);
// Verify that compaction was successfully updated
coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
foundDataSourceCompactionConfig = null;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) {
foundDataSourceCompactionConfig = dataSourceCompactionConfig;
}
}
Assert.assertNotNull(foundDataSourceCompactionConfig);
Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), newPartitionsSpec);
Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), newSkipOffset);
}
}

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -86,6 +88,21 @@ public class CoordinatorCompactionConfig
);
}
public static byte[] getConfigInByteFromDb(final MetadataStorageConnector connector, MetadataStorageTablesConfig config)
{
return connector.lookup(
config.getConfigTable(),
"name",
"payload",
CoordinatorCompactionConfig.CONFIG_KEY
);
}
public static CoordinatorCompactionConfig convertByteToConfig(final JacksonConfigManager configManager, byte[] configInByte)
{
return configManager.convertByteToConfig(configInByte, CoordinatorCompactionConfig.class, CoordinatorCompactionConfig.empty());
}
@Nonnull
public static CoordinatorCompactionConfig current(final JacksonConfigManager configManager)
{

View File

@ -30,6 +30,8 @@ import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -58,17 +60,23 @@ public class KillCompactionConfig implements CoordinatorDuty
private final JacksonConfigManager jacksonConfigManager;
private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private final MetadataStorageConnector connector;
private final MetadataStorageTablesConfig connectorConfig;
@Inject
public KillCompactionConfig(
DruidCoordinatorConfig config,
SqlSegmentsMetadataManager sqlSegmentsMetadataManager,
JacksonConfigManager jacksonConfigManager
JacksonConfigManager jacksonConfigManager,
MetadataStorageConnector connector,
MetadataStorageTablesConfig connectorConfig
)
{
this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
this.jacksonConfigManager = jacksonConfigManager;
this.period = config.getCoordinatorCompactionKillPeriod().getMillis();
this.connector = connector;
this.connectorConfig = connectorConfig;
Preconditions.checkArgument(
this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
"Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"
@ -88,7 +96,8 @@ public class KillCompactionConfig implements CoordinatorDuty
try {
RetryUtils.retry(
() -> {
CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(jacksonConfigManager);
final byte[] currentBytes = CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(jacksonConfigManager, currentBytes);
// If current compaction config is empty then there is nothing to do
if (CoordinatorCompactionConfig.empty().equals(current)) {
log.info(
@ -112,8 +121,7 @@ public class KillCompactionConfig implements CoordinatorDuty
ConfigManager.SetResult result = jacksonConfigManager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
// Do database insert without swap if the current config is empty as this means the config may be null in the database
current,
currentBytes,
CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(updated.values())),
new AuditInfo(
"KillCompactionConfig",

View File

@ -28,6 +28,9 @@ import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.http.security.ConfigResourceFilter;
@ -57,15 +60,24 @@ import java.util.stream.Collectors;
@ResourceFilters(ConfigResourceFilter.class)
public class CoordinatorCompactionConfigsResource
{
private static final Logger LOG = new Logger(CoordinatorCompactionConfigsResource.class);
private static final long UPDATE_RETRY_DELAY = 1000;
static final int UPDATE_NUM_RETRY = 5;
private final JacksonConfigManager manager;
private final MetadataStorageConnector connector;
private final MetadataStorageTablesConfig connectorConfig;
@Inject
public CoordinatorCompactionConfigsResource(JacksonConfigManager manager)
public CoordinatorCompactionConfigsResource(
JacksonConfigManager manager,
MetadataStorageConnector connector,
MetadataStorageTablesConfig connectorConfig
)
{
this.manager = manager;
this.connector = connector;
this.connectorConfig = connectorConfig;
}
@GET
@ -87,8 +99,8 @@ public class CoordinatorCompactionConfigsResource
)
{
Callable<SetResult> callable = () -> {
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
final byte[] currentBytes = getCurrentConfigInByteFromDb();
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from(
current,
compactionTaskSlotRatio,
@ -97,8 +109,7 @@ public class CoordinatorCompactionConfigsResource
return manager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
// Do database insert without swap if the current config is empty as this means the config may be null in the database
CoordinatorCompactionConfig.empty().equals(current) ? null : current,
currentBytes,
newCompactionConfig,
new AuditInfo(author, comment, req.getRemoteAddr())
);
@ -116,7 +127,8 @@ public class CoordinatorCompactionConfigsResource
)
{
Callable<SetResult> callable = () -> {
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
final byte[] currentBytes = getCurrentConfigInByteFromDb();
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig newCompactionConfig;
final Map<String, DataSourceCompactionConfig> newConfigs = current
.getCompactionConfigs()
@ -127,8 +139,7 @@ public class CoordinatorCompactionConfigsResource
return manager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
// Do database insert without swap if the current config is empty as this means the config may be null in the database
CoordinatorCompactionConfig.empty().equals(current) ? null : current,
currentBytes,
newCompactionConfig,
new AuditInfo(author, comment, req.getRemoteAddr())
);
@ -166,7 +177,8 @@ public class CoordinatorCompactionConfigsResource
)
{
Callable<SetResult> callable = () -> {
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
final byte[] currentBytes = getCurrentConfigInByteFromDb();
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final Map<String, DataSourceCompactionConfig> configs = current
.getCompactionConfigs()
.stream()
@ -179,8 +191,7 @@ public class CoordinatorCompactionConfigsResource
return manager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
// Do database insert without swap if the current config is empty as this means the config may be null in the database
CoordinatorCompactionConfig.empty().equals(current) ? null : current,
currentBytes,
CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(configs.values())),
new AuditInfo(author, comment, req.getRemoteAddr())
);
@ -204,18 +215,21 @@ public class CoordinatorCompactionConfigsResource
}
}
catch (Exception e) {
LOG.warn(e, "Update compaction config failed");
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(ImmutableMap.of("error", e))
.entity(ImmutableMap.of("error", createErrorMessage(e)))
.build();
}
if (setResult.isOk()) {
return Response.ok().build();
} else if (setResult.getException() instanceof NoSuchElementException) {
LOG.warn(setResult.getException(), "Update compaction config failed");
return Response.status(Response.Status.NOT_FOUND).build();
} else {
LOG.warn(setResult.getException(), "Update compaction config failed");
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", setResult.getException()))
.entity(ImmutableMap.of("error", createErrorMessage(setResult.getException())))
.build();
}
}
@ -229,4 +243,18 @@ public class CoordinatorCompactionConfigsResource
throw new RuntimeException(ie);
}
}
private byte[] getCurrentConfigInByteFromDb()
{
return CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
}
private String createErrorMessage(Exception e)
{
if (e.getMessage() == null) {
return "Unknown Error";
} else {
return e.getMessage();
}
}
}

View File

@ -27,6 +27,8 @@ import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -36,6 +38,7 @@ import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -48,8 +51,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.util.concurrent.atomic.AtomicReference;
@RunWith(MockitoJUnitRunner.class)
public class KillCompactionConfigTest
{
@ -65,11 +66,23 @@ public class KillCompactionConfigTest
@Mock
private JacksonConfigManager mockJacksonConfigManager;
@Mock
private MetadataStorageConnector mockConnector;
@Mock
private MetadataStorageTablesConfig mockConnectorConfig;
@Rule
public ExpectedException exception = ExpectedException.none();
private KillCompactionConfig killCompactionConfig;
@Before
public void setup()
{
Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
}
@Test
public void testRunSkipIfLastRunLessThanPeriod()
{
@ -93,7 +106,13 @@ public class KillCompactionConfigTest
10,
null
);
killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
killCompactionConfig = new KillCompactionConfig(
druidCoordinatorConfig,
mockSqlSegmentsMetadataManager,
mockJacksonConfigManager,
mockConnector,
mockConnectorConfig
);
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyZeroInteractions(mockSqlSegmentsMetadataManager);
Mockito.verifyZeroInteractions(mockJacksonConfigManager);
@ -125,7 +144,13 @@ public class KillCompactionConfigTest
);
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
killCompactionConfig = new KillCompactionConfig(
druidCoordinatorConfig,
mockSqlSegmentsMetadataManager,
mockJacksonConfigManager,
mockConnector,
mockConnectorConfig
);
}
@ -134,11 +159,17 @@ public class KillCompactionConfigTest
{
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
// Set current compaction config to an empty compaction config
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
).thenReturn(CoordinatorCompactionConfig.empty());
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
null,
@ -160,18 +191,30 @@ public class KillCompactionConfigTest
10,
null
);
killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
killCompactionConfig = new KillCompactionConfig(
druidCoordinatorConfig,
mockSqlSegmentsMetadataManager,
mockJacksonConfigManager,
mockConnector,
mockConnectorConfig
);
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyZeroInteractions(mockSqlSegmentsMetadataManager);
final ArgumentCaptor<ServiceEventBuilder> emittedEventCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class);
Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture());
Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric"));
Assert.assertEquals(0, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
Mockito.verify(mockJacksonConfigManager).watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
);
Mockito.verify(mockConnector).lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
);
Mockito.verifyNoMoreInteractions(mockJacksonConfigManager);
}
@ -204,14 +247,21 @@ public class KillCompactionConfigTest
ImmutableMap.of("key", "val")
);
CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig));
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
byte[] originalCurrentConfigBytes = {1, 2, 3};
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
).thenReturn(originalCurrentConfigBytes);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(originalCurrentConfig));
).thenReturn(originalCurrentConfig);
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of(activeDatasourceName));
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@ -240,12 +290,18 @@ public class KillCompactionConfigTest
10,
null
);
killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
killCompactionConfig = new KillCompactionConfig(
druidCoordinatorConfig,
mockSqlSegmentsMetadataManager,
mockJacksonConfigManager,
mockConnector,
mockConnectorConfig
);
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
// Verify and Assert
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), originalCurrentConfig);
Assert.assertEquals(oldConfigCaptor.getValue(), originalCurrentConfigBytes);
Assert.assertNotNull(newConfigCaptor.getValue());
// The updated config should only contains one compaction config for the active datasource
Assert.assertEquals(1, newConfigCaptor.getValue().getCompactionConfigs().size());
@ -257,14 +313,20 @@ public class KillCompactionConfigTest
// Should delete 1 config
Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
Mockito.verify(mockJacksonConfigManager).watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
);
Mockito.verify(mockConnector).lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
);
Mockito.verify(mockJacksonConfigManager).set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any()
);
@ -290,16 +352,23 @@ public class KillCompactionConfigTest
);
CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig));
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
byte[] originalCurrentConfigBytes = {1, 2, 3};
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
).thenReturn(originalCurrentConfigBytes);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(originalCurrentConfig));
).thenReturn(originalCurrentConfig);
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of());
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any())
).thenAnswer(new Answer() {
@ -337,7 +406,13 @@ public class KillCompactionConfigTest
10,
null
);
killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
killCompactionConfig = new KillCompactionConfig(
druidCoordinatorConfig,
mockSqlSegmentsMetadataManager,
mockJacksonConfigManager,
mockConnector,
mockConnectorConfig
);
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
// Verify and Assert
@ -347,16 +422,23 @@ public class KillCompactionConfigTest
// Should delete 1 config
Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
// Should call watch (to refresh current compaction config) four times due to RetryableException when failed
Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
// Should call convertByteToConfig and lookup (to refresh current compaction config) four times due to RetryableException when failed
Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).convertByteToConfig(
ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
);
Mockito.verify(mockConnector, Mockito.times(4)).lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
);
// Should call set (to try set new updated compaction config) four times due to RetryableException when failed
Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any()
);

View File

@ -25,6 +25,8 @@ import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
@ -42,7 +44,6 @@ import org.mockito.junit.MockitoJUnitRunner;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
@RunWith(MockitoJUnitRunner.class)
public class CoordinatorCompactionConfigsResourceTest
@ -58,6 +59,8 @@ public class CoordinatorCompactionConfigsResourceTest
null,
ImmutableMap.of("key", "val")
);
private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3};
private static final CoordinatorCompactionConfig ORIGINAL_CONFIG = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG));
@Mock
@ -66,24 +69,41 @@ public class CoordinatorCompactionConfigsResourceTest
@Mock
private HttpServletRequest mockHttpServletRequest;
@Mock
private MetadataStorageConnector mockConnector;
@Mock
private MetadataStorageTablesConfig mockConnectorConfig;
private CoordinatorCompactionConfigsResource coordinatorCompactionConfigsResource;
@Before
public void setup()
{
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
).thenReturn(OLD_CONFIG_IN_BYTES);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(ORIGINAL_CONFIG));
coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(mockJacksonConfigManager);
).thenReturn(ORIGINAL_CONFIG);
Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(
mockJacksonConfigManager,
mockConnector,
mockConnectorConfig
);
Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123");
}
@Test
public void testSetCompactionTaskLimitWithExistingConfig()
{
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@ -105,7 +125,7 @@ public class CoordinatorCompactionConfigsResourceTest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG);
Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
@ -114,7 +134,7 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testAddOrUpdateCompactionConfigWithExistingConfig()
{
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@ -144,7 +164,7 @@ public class CoordinatorCompactionConfigsResourceTest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG);
Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(2, newConfigCaptor.getValue().getCompactionConfigs().size());
Assert.assertEquals(OLD_CONFIG, newConfigCaptor.getValue().getCompactionConfigs().get(0));
@ -154,7 +174,7 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testDeleteCompactionConfigWithExistingConfig()
{
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@ -175,11 +195,11 @@ public class CoordinatorCompactionConfigsResourceTest
ImmutableMap.of("key", "val")
);
final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(originalConfig));
).thenReturn(originalConfig);
String author = "maytas";
String comment = "hello";
@ -191,7 +211,7 @@ public class CoordinatorCompactionConfigsResourceTest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), originalConfig);
Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(0, newConfigCaptor.getValue().getCompactionConfigs().size());
}
@ -223,12 +243,18 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testSetCompactionTaskLimitWithoutExistingConfig()
{
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
).thenReturn(CoordinatorCompactionConfig.empty());
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@ -258,12 +284,18 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testAddOrUpdateCompactionConfigWithoutExistingConfig()
{
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
).thenReturn(CoordinatorCompactionConfig.empty());
final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@ -301,11 +333,17 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist()
{
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("name"),
ArgumentMatchers.eq("payload"),
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
).thenReturn(null);
Mockito.when(mockJacksonConfigManager.convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
).thenReturn(CoordinatorCompactionConfig.empty());
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig(