From e5633d7842f44464ed463a8718051c7542a81ccb Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 27 May 2021 16:34:32 -0700 Subject: [PATCH] Fix bug: 502 bad gateway thrown when we edit/delete any auto compaction config created 0.21.0 or before (#11311) * fix bug * add test * fix IT * fix checkstyle * address comments --- .travis.yml | 6 +- .../druid/common/config/ConfigManager.java | 7 +- .../common/config/ConfigManagerConfig.java | 8 + .../common/config/JacksonConfigManager.java | 20 ++- .../common/config/ConfigManagerTest.java | 22 ++- .../config/JacksonConfigManagerTest.java | 44 ++++++ integration-tests/docker/druid.sh | 2 +- .../docker/test-data/upgrade-sample-data.sql | 16 ++ .../org/apache/druid/tests/TestNGGroup.java | 2 + .../duty/ITAutoCompactionUpgradeTest.java | 121 +++++++++++++++ .../CoordinatorCompactionConfig.java | 17 +++ .../duty/KillCompactionConfig.java | 16 +- .../CoordinatorCompactionConfigsResource.java | 54 +++++-- .../duty/KillCompactionConfigTest.java | 142 ++++++++++++++---- ...rdinatorCompactionConfigsResourceTest.java | 88 ++++++++--- 15 files changed, 480 insertions(+), 85 deletions(-) create mode 100644 integration-tests/docker/test-data/upgrade-sample-data.sql create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java diff --git a/.travis.yml b/.travis.yml index 5fd56d89d6e..2fbc7d6b0f0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -592,13 +592,13 @@ jobs: stage: Tests - phase 2 jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer" - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests" @@ -671,7 +671,7 @@ jobs: - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) other integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests" diff --git a/core/src/main/java/org/apache/druid/common/config/ConfigManager.java b/core/src/main/java/org/apache/druid/common/config/ConfigManager.java index 7bad650197c..56cbcd5eba4 100644 --- a/core/src/main/java/org/apache/druid/common/config/ConfigManager.java +++ b/core/src/main/java/org/apache/druid/common/config/ConfigManager.java @@ -176,7 +176,7 @@ public class ConfigManager return set(key, serde, null, obj); } - public SetResult set(final String key, final ConfigSerde serde, @Nullable final T oldObject, final T newObject) + public SetResult set(final String key, final ConfigSerde serde, @Nullable final byte[] oldValue, final T newObject) { if (newObject == null || !started) { if (newObject == null) { @@ -191,11 +191,10 @@ public class ConfigManager try { return exec.submit( () -> { - if (oldObject == null) { + if (oldValue == null || !config.get().isEnableCompareAndSwap()) { dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes); } else { - final byte[] oldBytes = serde.serialize(oldObject); - MetadataCASUpdate metadataCASUpdate = createMetadataCASUpdate(key, oldBytes, newBytes); + MetadataCASUpdate metadataCASUpdate = createMetadataCASUpdate(key, oldValue, newBytes); boolean success = dbConnector.compareAndSwap(ImmutableList.of(metadataCASUpdate)); if (!success) { return SetResult.fail(new IllegalStateException("Config value has changed"), true); diff --git a/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java b/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java index 5ef5816d524..472bacd5bfb 100644 --- a/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java +++ b/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java @@ -32,8 +32,16 @@ public class ConfigManagerConfig @NotNull private Period pollDuration = new Period("PT1M"); + @JsonProperty + private boolean enableCompareAndSwap = true; + public Period getPollDuration() { return pollDuration; } + + public boolean isEnableCompareAndSwap() + { + return enableCompareAndSwap; + } } diff --git a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java index 7e3eeeb641a..7cf78a5ca0b 100644 --- a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java +++ b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java @@ -73,6 +73,16 @@ public class JacksonConfigManager return configManager.watchConfig(key, create(clazz, defaultVal)); } + public T convertByteToConfig(byte[] configInByte, Class clazz, T defaultVal) + { + if (configInByte == null) { + return defaultVal; + } else { + final ConfigSerde serde = create(clazz, defaultVal); + return serde.deserialize(configInByte); + } + } + /** * Set the config and add audit entry * @@ -90,14 +100,18 @@ public class JacksonConfigManager * * @param key of the config to set * @param oldValue old config value. If not null, then the update will only succeed if the insert - * happens when current database entry is the same as this value. If null, then the insert - * will not consider the current database entry. + * happens when current database entry is the same as this value. Note that the current database + * entry (in array of bytes) have to be exactly the same as the array of bytes of this value for + * update to succeed. If null, then the insert will not consider the current database entry. Note + * that this field intentionally uses byte array to be resilient across serde of existing data + * retrieved from the database (instead of Java object which may have additional fields added + * as a result of serde) * @param newValue new config value to insert * @param auditInfo metadata regarding the change to config, for audit purposes */ public SetResult set( String key, - @Nullable T oldValue, + @Nullable byte[] oldValue, T newValue, AuditInfo auditInfo ) diff --git a/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java b/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java index 038aa3807ea..9705939991f 100644 --- a/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java +++ b/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java @@ -49,7 +49,7 @@ public class ConfigManagerTest { private static final String CONFIG_KEY = "configX"; private static final String TABLE_NAME = "config_table"; - private static final TestConfig OLD_CONFIG = new TestConfig("1", "x", 1); + private static final byte[] OLD_CONFIG = {1, 2, 3}; private static final TestConfig NEW_CONFIG = new TestConfig("2", "y", 2); @Mock @@ -120,6 +120,7 @@ public class ConfigManagerTest @Test public void testSetOldObjectNotNullShouldSwap() { + when(mockConfigManagerConfig.isEnableCompareAndSwap()).thenReturn(true); when(mockDbConnector.compareAndSwap(any(List.class))).thenReturn(true); final ArgumentCaptor> updateCaptor = ArgumentCaptor.forClass(List.class); configManager.start(); @@ -134,10 +135,27 @@ public class ConfigManagerTest Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_KEY_COLUMN, updateCaptor.getValue().get(0).getKeyColumn()); Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN, updateCaptor.getValue().get(0).getValueColumn()); Assert.assertEquals(CONFIG_KEY, updateCaptor.getValue().get(0).getKey()); - Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(OLD_CONFIG), updateCaptor.getValue().get(0).getOldValue()); + Assert.assertArrayEquals(OLD_CONFIG, updateCaptor.getValue().get(0).getOldValue()); Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(NEW_CONFIG), updateCaptor.getValue().get(0).getNewValue()); } + @Test + public void testSetOldObjectNotNullButCompareAndSwapDisabledShouldInsertWithoutSwap() + { + when(mockConfigManagerConfig.isEnableCompareAndSwap()).thenReturn(false); + configManager.start(); + ConfigManager.SetResult setResult = configManager.set(CONFIG_KEY, configConfigSerdeFromClass, OLD_CONFIG, NEW_CONFIG); + Assert.assertTrue(setResult.isOk()); + Mockito.verify(mockDbConnector).insertOrUpdate( + ArgumentMatchers.eq(TABLE_NAME), + ArgumentMatchers.anyString(), + ArgumentMatchers.anyString(), + ArgumentMatchers.eq(CONFIG_KEY), + ArgumentMatchers.any(byte[].class) + ); + Mockito.verifyNoMoreInteractions(mockDbConnector); + } + static class TestConfig { private final String version; diff --git a/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java index 25220ebb892..4a4aaf72cd4 100644 --- a/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java +++ b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java @@ -38,6 +38,8 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import java.util.Objects; + @RunWith(MockitoJUnitRunner.class) public class JacksonConfigManagerTest { @@ -136,6 +138,27 @@ public class JacksonConfigManagerTest Assert.assertNotNull(configSerdeCapture.getValue()); } + @Test + public void testConvertByteToConfigWithNullConfigInByte() + { + TestConfig defaultExpected = new TestConfig("version", null, 3); + TestConfig actual = jacksonConfigManager.convertByteToConfig(null, TestConfig.class, defaultExpected); + Assert.assertEquals(defaultExpected, actual); + } + + @Test + public void testConvertByteToConfigWithNonNullConfigInByte() + { + ConfigSerde configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference() + { + }, null); + TestConfig defaultConfig = new TestConfig("version", null, 3); + TestConfig expectedConfig = new TestConfig("version2", null, 5); + byte[] expectedConfigInByte = configConfigSerdeFromTypeReference.serialize(expectedConfig); + + TestConfig actual = jacksonConfigManager.convertByteToConfig(expectedConfigInByte, TestConfig.class, defaultConfig); + Assert.assertEquals(expectedConfig, actual); + } static class TestConfig { @@ -169,6 +192,27 @@ public class JacksonConfigManagerTest { return settingInt; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestConfig config = (TestConfig) o; + return settingInt == config.settingInt && + Objects.equals(version, config.version) && + Objects.equals(settingString, config.settingString); + } + + @Override + public int hashCode() + { + return Objects.hash(version, settingString, settingInt); + } } static class ClassThatJacksonCannotSerialize diff --git a/integration-tests/docker/druid.sh b/integration-tests/docker/druid.sh index 0a4f00b9652..baa0e9710d7 100755 --- a/integration-tests/docker/druid.sh +++ b/integration-tests/docker/druid.sh @@ -85,7 +85,7 @@ setupData() # The "query" and "security" test groups require data to be setup before running the tests. # In particular, they requires segments to be download from a pre-existing s3 bucket. # This is done by using the loadSpec put into metadatastore and s3 credientials set below. - if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ]; then + if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ]; then # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. find /var/lib/mysql -type f -exec touch {} \; && service mysql start \ && cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid && /etc/init.d/mysql stop diff --git a/integration-tests/docker/test-data/upgrade-sample-data.sql b/integration-tests/docker/test-data/upgrade-sample-data.sql new file mode 100644 index 00000000000..a58fdab38ae --- /dev/null +++ b/integration-tests/docker/test-data/upgrade-sample-data.sql @@ -0,0 +1,16 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +INSERT INTO druid_config (name, payload) VALUES ('coordinator.compaction.config', '{"compactionConfigs":[{"dataSource":"upgradeTest","taskPriority":25,"inputSegmentSizeBytes":419430400,"maxRowsPerSegment":null,"skipOffsetFromLatest":"P1D","tuningConfig":{"maxRowsInMemory":null,"maxBytesInMemory":null,"maxTotalRows":null,"splitHintSpec":null,"partitionsSpec":{"type":"hashed","numShards":null,"partitionDimensions":[],"partitionFunction":"murmur3_32_abs","maxRowsPerSegment":5000000},"indexSpec":null,"indexSpecForIntermediatePersists":null,"maxPendingPersists":null,"pushTimeout":null,"segmentWriteOutMediumFactory":null,"maxNumConcurrentSubTasks":null,"maxRetry":null,"taskStatusCheckPeriodMs":null,"chatHandlerTimeout":null,"chatHandlerNumRetries":null,"maxNumSegmentsToMerge":null,"totalNumMergeTasks":null,"forceGuaranteedRollup":true,"type":"index_parallel"},"taskContext":null}],"compactionTaskSlotRatio":0.1,"maxCompactionTaskSlots":2147483647}'); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index 8b962d2d1b5..bd346ffa318 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -43,6 +43,8 @@ public class TestNGGroup public static final String COMPACTION = "compaction"; + public static final String UPGRADE = "upgrade"; + public static final String APPEND_INGESTION = "append-ingestion"; public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index"; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java new file mode 100644 index 00000000000..b9607079618 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.coordinator.duty; + +import com.google.inject.Inject; +import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CompactionResourceTestClient; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractIndexerTest; +import org.joda.time.Period; +import org.testng.Assert; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Test(groups = {TestNGGroup.UPGRADE}) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest +{ + private static final Logger LOG = new Logger(ITAutoCompactionUpgradeTest.class); + private static final String UPGRADE_DATASOURCE_NAME = "upgradeTest"; + + @Inject + protected CompactionResourceTestClient compactionResource; + + @Inject + private IntegrationTestingConfig config; + + @Test + public void testUpgradeAutoCompactionConfigurationWhenConfigurationFromOlderVersionAlreadyExist() throws Exception + { + // Verify that compaction config already exist. This config was inserted manually into the database using SQL script. + // This auto compaction configuration payload is from Druid 0.21.0 + CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); + DataSourceCompactionConfig foundDataSourceCompactionConfig = null; + for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { + if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) { + foundDataSourceCompactionConfig = dataSourceCompactionConfig; + } + } + Assert.assertNotNull(foundDataSourceCompactionConfig); + + // Now submit a new auto compaction configuration + PartitionsSpec newPartitionsSpec = new DynamicPartitionsSpec(4000, null); + Period newSkipOffset = Period.seconds(0); + + DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig( + UPGRADE_DATASOURCE_NAME, + null, + null, + null, + newSkipOffset, + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + new MaxSizeSplitHintSpec(null, 1), + newPartitionsSpec, + null, + null, + null, + null, + null, + 1, + null, + null, + null, + null, + null, + 1 + ), + new UserCompactionTaskGranularityConfig(Granularities.YEAR, null), + new UserCompactionTaskIOConfig(true), + null + ); + compactionResource.submitCompactionConfig(compactionConfig); + + // Wait for compaction config to persist + Thread.sleep(2000); + + // Verify that compaction was successfully updated + coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); + foundDataSourceCompactionConfig = null; + for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { + if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) { + foundDataSourceCompactionConfig = dataSourceCompactionConfig; + } + } + Assert.assertNotNull(foundDataSourceCompactionConfig); + Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig()); + Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), newPartitionsSpec); + Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), newSkipOffset); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java index cc1fdf61e34..409a8135ff1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.metadata.MetadataStorageConnector; +import org.apache.druid.metadata.MetadataStorageTablesConfig; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -86,6 +88,21 @@ public class CoordinatorCompactionConfig ); } + public static byte[] getConfigInByteFromDb(final MetadataStorageConnector connector, MetadataStorageTablesConfig config) + { + return connector.lookup( + config.getConfigTable(), + "name", + "payload", + CoordinatorCompactionConfig.CONFIG_KEY + ); + } + + public static CoordinatorCompactionConfig convertByteToConfig(final JacksonConfigManager configManager, byte[] configInByte) + { + return configManager.convertByteToConfig(configInByte, CoordinatorCompactionConfig.class, CoordinatorCompactionConfig.empty()); + } + @Nonnull public static CoordinatorCompactionConfig current(final JacksonConfigManager configManager) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java index cd54cf3d8d6..74645b51b4e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java @@ -30,6 +30,8 @@ import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.MetadataStorageConnector; +import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -58,17 +60,23 @@ public class KillCompactionConfig implements CoordinatorDuty private final JacksonConfigManager jacksonConfigManager; private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager; + private final MetadataStorageConnector connector; + private final MetadataStorageTablesConfig connectorConfig; @Inject public KillCompactionConfig( DruidCoordinatorConfig config, SqlSegmentsMetadataManager sqlSegmentsMetadataManager, - JacksonConfigManager jacksonConfigManager + JacksonConfigManager jacksonConfigManager, + MetadataStorageConnector connector, + MetadataStorageTablesConfig connectorConfig ) { this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager; this.jacksonConfigManager = jacksonConfigManager; this.period = config.getCoordinatorCompactionKillPeriod().getMillis(); + this.connector = connector; + this.connectorConfig = connectorConfig; Preconditions.checkArgument( this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(), "Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod" @@ -88,7 +96,8 @@ public class KillCompactionConfig implements CoordinatorDuty try { RetryUtils.retry( () -> { - CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(jacksonConfigManager); + final byte[] currentBytes = CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig); + final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(jacksonConfigManager, currentBytes); // If current compaction config is empty then there is nothing to do if (CoordinatorCompactionConfig.empty().equals(current)) { log.info( @@ -112,8 +121,7 @@ public class KillCompactionConfig implements CoordinatorDuty ConfigManager.SetResult result = jacksonConfigManager.set( CoordinatorCompactionConfig.CONFIG_KEY, - // Do database insert without swap if the current config is empty as this means the config may be null in the database - current, + currentBytes, CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(updated.values())), new AuditInfo( "KillCompactionConfig", diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index a5cd6112dce..cb17c45e7c8 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -28,6 +28,9 @@ import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.MetadataStorageConnector; +import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.http.security.ConfigResourceFilter; @@ -57,15 +60,24 @@ import java.util.stream.Collectors; @ResourceFilters(ConfigResourceFilter.class) public class CoordinatorCompactionConfigsResource { + private static final Logger LOG = new Logger(CoordinatorCompactionConfigsResource.class); private static final long UPDATE_RETRY_DELAY = 1000; static final int UPDATE_NUM_RETRY = 5; private final JacksonConfigManager manager; + private final MetadataStorageConnector connector; + private final MetadataStorageTablesConfig connectorConfig; @Inject - public CoordinatorCompactionConfigsResource(JacksonConfigManager manager) + public CoordinatorCompactionConfigsResource( + JacksonConfigManager manager, + MetadataStorageConnector connector, + MetadataStorageTablesConfig connectorConfig + ) { this.manager = manager; + this.connector = connector; + this.connectorConfig = connectorConfig; } @GET @@ -87,8 +99,8 @@ public class CoordinatorCompactionConfigsResource ) { Callable callable = () -> { - final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager); - + final byte[] currentBytes = getCurrentConfigInByteFromDb(); + final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes); final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from( current, compactionTaskSlotRatio, @@ -97,8 +109,7 @@ public class CoordinatorCompactionConfigsResource return manager.set( CoordinatorCompactionConfig.CONFIG_KEY, - // Do database insert without swap if the current config is empty as this means the config may be null in the database - CoordinatorCompactionConfig.empty().equals(current) ? null : current, + currentBytes, newCompactionConfig, new AuditInfo(author, comment, req.getRemoteAddr()) ); @@ -116,7 +127,8 @@ public class CoordinatorCompactionConfigsResource ) { Callable callable = () -> { - final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager); + final byte[] currentBytes = getCurrentConfigInByteFromDb(); + final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes); final CoordinatorCompactionConfig newCompactionConfig; final Map newConfigs = current .getCompactionConfigs() @@ -127,8 +139,7 @@ public class CoordinatorCompactionConfigsResource return manager.set( CoordinatorCompactionConfig.CONFIG_KEY, - // Do database insert without swap if the current config is empty as this means the config may be null in the database - CoordinatorCompactionConfig.empty().equals(current) ? null : current, + currentBytes, newCompactionConfig, new AuditInfo(author, comment, req.getRemoteAddr()) ); @@ -166,7 +177,8 @@ public class CoordinatorCompactionConfigsResource ) { Callable callable = () -> { - final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager); + final byte[] currentBytes = getCurrentConfigInByteFromDb(); + final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes); final Map configs = current .getCompactionConfigs() .stream() @@ -179,8 +191,7 @@ public class CoordinatorCompactionConfigsResource return manager.set( CoordinatorCompactionConfig.CONFIG_KEY, - // Do database insert without swap if the current config is empty as this means the config may be null in the database - CoordinatorCompactionConfig.empty().equals(current) ? null : current, + currentBytes, CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(configs.values())), new AuditInfo(author, comment, req.getRemoteAddr()) ); @@ -204,18 +215,21 @@ public class CoordinatorCompactionConfigsResource } } catch (Exception e) { + LOG.warn(e, "Update compaction config failed"); return Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(ImmutableMap.of("error", e)) + .entity(ImmutableMap.of("error", createErrorMessage(e))) .build(); } if (setResult.isOk()) { return Response.ok().build(); } else if (setResult.getException() instanceof NoSuchElementException) { + LOG.warn(setResult.getException(), "Update compaction config failed"); return Response.status(Response.Status.NOT_FOUND).build(); } else { + LOG.warn(setResult.getException(), "Update compaction config failed"); return Response.status(Response.Status.BAD_REQUEST) - .entity(ImmutableMap.of("error", setResult.getException())) + .entity(ImmutableMap.of("error", createErrorMessage(setResult.getException()))) .build(); } } @@ -229,4 +243,18 @@ public class CoordinatorCompactionConfigsResource throw new RuntimeException(ie); } } + + private byte[] getCurrentConfigInByteFromDb() + { + return CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig); + } + + private String createErrorMessage(Exception e) + { + if (e.getMessage() == null) { + return "Unknown Error"; + } else { + return e.getMessage(); + } + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java index 555c9981473..4aad5df6033 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java @@ -27,6 +27,8 @@ import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.metadata.MetadataStorageConnector; +import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -36,6 +38,7 @@ import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.joda.time.Duration; import org.joda.time.Period; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -48,8 +51,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; -import java.util.concurrent.atomic.AtomicReference; - @RunWith(MockitoJUnitRunner.class) public class KillCompactionConfigTest { @@ -65,11 +66,23 @@ public class KillCompactionConfigTest @Mock private JacksonConfigManager mockJacksonConfigManager; + @Mock + private MetadataStorageConnector mockConnector; + + @Mock + private MetadataStorageTablesConfig mockConnectorConfig; + @Rule public ExpectedException exception = ExpectedException.none(); private KillCompactionConfig killCompactionConfig; + @Before + public void setup() + { + Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config"); + } + @Test public void testRunSkipIfLastRunLessThanPeriod() { @@ -93,7 +106,13 @@ public class KillCompactionConfigTest 10, null ); - killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager); + killCompactionConfig = new KillCompactionConfig( + druidCoordinatorConfig, + mockSqlSegmentsMetadataManager, + mockJacksonConfigManager, + mockConnector, + mockConnectorConfig + ); killCompactionConfig.run(mockDruidCoordinatorRuntimeParams); Mockito.verifyZeroInteractions(mockSqlSegmentsMetadataManager); Mockito.verifyZeroInteractions(mockJacksonConfigManager); @@ -125,7 +144,13 @@ public class KillCompactionConfigTest ); exception.expect(IllegalArgumentException.class); exception.expectMessage("Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); - killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager); + killCompactionConfig = new KillCompactionConfig( + druidCoordinatorConfig, + mockSqlSegmentsMetadataManager, + mockJacksonConfigManager, + mockConnector, + mockConnectorConfig + ); } @@ -134,11 +159,17 @@ public class KillCompactionConfigTest { Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); // Set current compaction config to an empty compaction config - Mockito.when(mockJacksonConfigManager.watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + Mockito.when(mockConnector.lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ).thenReturn(null); + Mockito.when(mockJacksonConfigManager.convertByteToConfig( + ArgumentMatchers.eq(null), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) - ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())); + ).thenReturn(CoordinatorCompactionConfig.empty()); TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( null, @@ -160,18 +191,30 @@ public class KillCompactionConfigTest 10, null ); - killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager); + killCompactionConfig = new KillCompactionConfig( + druidCoordinatorConfig, + mockSqlSegmentsMetadataManager, + mockJacksonConfigManager, + mockConnector, + mockConnectorConfig + ); killCompactionConfig.run(mockDruidCoordinatorRuntimeParams); Mockito.verifyZeroInteractions(mockSqlSegmentsMetadataManager); final ArgumentCaptor emittedEventCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class); Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture()); Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric")); Assert.assertEquals(0, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value")); - Mockito.verify(mockJacksonConfigManager).watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + Mockito.verify(mockJacksonConfigManager).convertByteToConfig( + ArgumentMatchers.eq(null), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) ); + Mockito.verify(mockConnector).lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ); Mockito.verifyNoMoreInteractions(mockJacksonConfigManager); } @@ -204,14 +247,21 @@ public class KillCompactionConfigTest ImmutableMap.of("key", "val") ); CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig)); - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); - Mockito.when(mockJacksonConfigManager.watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + byte[] originalCurrentConfigBytes = {1, 2, 3}; + Mockito.when(mockConnector.lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ).thenReturn(originalCurrentConfigBytes); + Mockito.when(mockJacksonConfigManager.convertByteToConfig( + ArgumentMatchers.eq(originalCurrentConfigBytes), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) - ).thenReturn(new AtomicReference<>(originalCurrentConfig)); + ).thenReturn(originalCurrentConfig); + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of(activeDatasourceName)); - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), @@ -240,12 +290,18 @@ public class KillCompactionConfigTest 10, null ); - killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager); + killCompactionConfig = new KillCompactionConfig( + druidCoordinatorConfig, + mockSqlSegmentsMetadataManager, + mockJacksonConfigManager, + mockConnector, + mockConnectorConfig + ); killCompactionConfig.run(mockDruidCoordinatorRuntimeParams); // Verify and Assert Assert.assertNotNull(oldConfigCaptor.getValue()); - Assert.assertEquals(oldConfigCaptor.getValue(), originalCurrentConfig); + Assert.assertEquals(oldConfigCaptor.getValue(), originalCurrentConfigBytes); Assert.assertNotNull(newConfigCaptor.getValue()); // The updated config should only contains one compaction config for the active datasource Assert.assertEquals(1, newConfigCaptor.getValue().getCompactionConfigs().size()); @@ -257,14 +313,20 @@ public class KillCompactionConfigTest // Should delete 1 config Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value")); - Mockito.verify(mockJacksonConfigManager).watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + Mockito.verify(mockJacksonConfigManager).convertByteToConfig( + ArgumentMatchers.eq(originalCurrentConfigBytes), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) ); + Mockito.verify(mockConnector).lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ); Mockito.verify(mockJacksonConfigManager).set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - ArgumentMatchers.any(CoordinatorCompactionConfig.class), + ArgumentMatchers.any(byte[].class), ArgumentMatchers.any(CoordinatorCompactionConfig.class), ArgumentMatchers.any() ); @@ -290,16 +352,23 @@ public class KillCompactionConfigTest ); CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig)); - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); - Mockito.when(mockJacksonConfigManager.watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + byte[] originalCurrentConfigBytes = {1, 2, 3}; + Mockito.when(mockConnector.lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ).thenReturn(originalCurrentConfigBytes); + Mockito.when(mockJacksonConfigManager.convertByteToConfig( + ArgumentMatchers.eq(originalCurrentConfigBytes), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) - ).thenReturn(new AtomicReference<>(originalCurrentConfig)); + ).thenReturn(originalCurrentConfig); + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of()); Mockito.when(mockJacksonConfigManager.set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - ArgumentMatchers.any(CoordinatorCompactionConfig.class), + ArgumentMatchers.any(byte[].class), ArgumentMatchers.any(CoordinatorCompactionConfig.class), ArgumentMatchers.any()) ).thenAnswer(new Answer() { @@ -337,7 +406,13 @@ public class KillCompactionConfigTest 10, null ); - killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager); + killCompactionConfig = new KillCompactionConfig( + druidCoordinatorConfig, + mockSqlSegmentsMetadataManager, + mockJacksonConfigManager, + mockConnector, + mockConnectorConfig + ); killCompactionConfig.run(mockDruidCoordinatorRuntimeParams); // Verify and Assert @@ -347,16 +422,23 @@ public class KillCompactionConfigTest // Should delete 1 config Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value")); - // Should call watch (to refresh current compaction config) four times due to RetryableException when failed - Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + // Should call convertByteToConfig and lookup (to refresh current compaction config) four times due to RetryableException when failed + Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).convertByteToConfig( + ArgumentMatchers.eq(originalCurrentConfigBytes), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()) ); + Mockito.verify(mockConnector, Mockito.times(4)).lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY) + ); + // Should call set (to try set new updated compaction config) four times due to RetryableException when failed Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), - ArgumentMatchers.any(CoordinatorCompactionConfig.class), + ArgumentMatchers.any(byte[].class), ArgumentMatchers.any(CoordinatorCompactionConfig.class), ArgumentMatchers.any() ); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index 988fd2b992b..c130dc3ea4c 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -25,6 +25,8 @@ import org.apache.commons.lang3.mutable.MutableInt; import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.metadata.MetadataStorageConnector; +import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; @@ -42,7 +44,6 @@ import org.mockito.junit.MockitoJUnitRunner; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicReference; @RunWith(MockitoJUnitRunner.class) public class CoordinatorCompactionConfigsResourceTest @@ -58,6 +59,8 @@ public class CoordinatorCompactionConfigsResourceTest null, ImmutableMap.of("key", "val") ); + private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3}; + private static final CoordinatorCompactionConfig ORIGINAL_CONFIG = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG)); @Mock @@ -66,24 +69,41 @@ public class CoordinatorCompactionConfigsResourceTest @Mock private HttpServletRequest mockHttpServletRequest; + @Mock + private MetadataStorageConnector mockConnector; + + @Mock + private MetadataStorageTablesConfig mockConnectorConfig; + private CoordinatorCompactionConfigsResource coordinatorCompactionConfigsResource; @Before public void setup() { - Mockito.when(mockJacksonConfigManager.watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + Mockito.when(mockConnector.lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ).thenReturn(OLD_CONFIG_IN_BYTES); + Mockito.when(mockJacksonConfigManager.convertByteToConfig( + ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) - ).thenReturn(new AtomicReference<>(ORIGINAL_CONFIG)); - coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(mockJacksonConfigManager); + ).thenReturn(ORIGINAL_CONFIG); + Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config"); + coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource( + mockJacksonConfigManager, + mockConnector, + mockConnectorConfig + ); Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123"); } @Test public void testSetCompactionTaskLimitWithExistingConfig() { - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), @@ -105,7 +125,7 @@ public class CoordinatorCompactionConfigsResourceTest ); Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); Assert.assertNotNull(oldConfigCaptor.getValue()); - Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG); + Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES); Assert.assertNotNull(newConfigCaptor.getValue()); Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots); Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0); @@ -114,7 +134,7 @@ public class CoordinatorCompactionConfigsResourceTest @Test public void testAddOrUpdateCompactionConfigWithExistingConfig() { - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), @@ -144,7 +164,7 @@ public class CoordinatorCompactionConfigsResourceTest ); Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); Assert.assertNotNull(oldConfigCaptor.getValue()); - Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG); + Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES); Assert.assertNotNull(newConfigCaptor.getValue()); Assert.assertEquals(2, newConfigCaptor.getValue().getCompactionConfigs().size()); Assert.assertEquals(OLD_CONFIG, newConfigCaptor.getValue().getCompactionConfigs().get(0)); @@ -154,7 +174,7 @@ public class CoordinatorCompactionConfigsResourceTest @Test public void testDeleteCompactionConfigWithExistingConfig() { - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), @@ -175,11 +195,11 @@ public class CoordinatorCompactionConfigsResourceTest ImmutableMap.of("key", "val") ); final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete)); - Mockito.when(mockJacksonConfigManager.watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + Mockito.when(mockJacksonConfigManager.convertByteToConfig( + ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) - ).thenReturn(new AtomicReference<>(originalConfig)); + ).thenReturn(originalConfig); String author = "maytas"; String comment = "hello"; @@ -191,7 +211,7 @@ public class CoordinatorCompactionConfigsResourceTest ); Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); Assert.assertNotNull(oldConfigCaptor.getValue()); - Assert.assertEquals(oldConfigCaptor.getValue(), originalConfig); + Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES); Assert.assertNotNull(newConfigCaptor.getValue()); Assert.assertEquals(0, newConfigCaptor.getValue().getCompactionConfigs().size()); } @@ -223,12 +243,18 @@ public class CoordinatorCompactionConfigsResourceTest @Test public void testSetCompactionTaskLimitWithoutExistingConfig() { - Mockito.when(mockJacksonConfigManager.watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + Mockito.when(mockConnector.lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ).thenReturn(null); + Mockito.when(mockJacksonConfigManager.convertByteToConfig( + ArgumentMatchers.eq(null), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) - ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())); - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + ).thenReturn(CoordinatorCompactionConfig.empty()); + final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), @@ -258,12 +284,18 @@ public class CoordinatorCompactionConfigsResourceTest @Test public void testAddOrUpdateCompactionConfigWithoutExistingConfig() { - Mockito.when(mockJacksonConfigManager.watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + Mockito.when(mockConnector.lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ).thenReturn(null); + Mockito.when(mockJacksonConfigManager.convertByteToConfig( + ArgumentMatchers.eq(null), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) - ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())); - final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); + ).thenReturn(CoordinatorCompactionConfig.empty()); + final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); Mockito.when(mockJacksonConfigManager.set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), @@ -301,11 +333,17 @@ public class CoordinatorCompactionConfigsResourceTest @Test public void testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist() { - Mockito.when(mockJacksonConfigManager.watch( - ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), + Mockito.when(mockConnector.lookup( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("name"), + ArgumentMatchers.eq("payload"), + ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)) + ).thenReturn(null); + Mockito.when(mockJacksonConfigManager.convertByteToConfig( + ArgumentMatchers.eq(null), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) - ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())); + ).thenReturn(CoordinatorCompactionConfig.empty()); String author = "maytas"; String comment = "hello"; Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig(