From 9b9e1cfecb360812c4e9f4497bda49a62d27159c Mon Sep 17 00:00:00 2001 From: Himanshu Date: Wed, 26 Apr 2017 16:59:20 -0500 Subject: [PATCH] coordinator dynamic config POST to update only explicitly specified fields (#4141) * coordinator dynamic config POST to update only explicitly specified fields instead of resetting everything else to zeros * address review comments --- .../coordinator/CoordinatorDynamicConfig.java | 107 +++++++++++++----- .../http/CoordinatorDynamicConfigTest.java | 46 +++++++- 2 files changed, 122 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java index 8d322fe6b44..b509d259d90 100644 --- a/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -18,10 +18,11 @@ */ package io.druid.server.coordinator; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; - +import io.druid.common.config.JacksonConfigManager; import io.druid.java.util.common.IAE; import java.util.Collection; @@ -45,20 +46,70 @@ public class CoordinatorDynamicConfig @JsonCreator public CoordinatorDynamicConfig( - @JsonProperty("millisToWaitBeforeDeleting") long millisToWaitBeforeDeleting, - @JsonProperty("mergeBytesLimit") long mergeBytesLimit, - @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, - @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, - @JsonProperty("replicantLifetime") int replicantLifetime, - @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, - @JsonProperty("balancerComputeThreads") int balancerComputeThreads, - @JsonProperty("emitBalancingStats") boolean emitBalancingStats, + @JacksonInject JacksonConfigManager configManager, + @JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting, + @JsonProperty("mergeBytesLimit") Long mergeBytesLimit, + @JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit, + @JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove, + @JsonProperty("replicantLifetime") Integer replicantLifetime, + @JsonProperty("replicationThrottleLimit") Integer replicationThrottleLimit, + @JsonProperty("balancerComputeThreads") Integer balancerComputeThreads, + @JsonProperty("emitBalancingStats") Boolean emitBalancingStats, // Type is Object here so that we can support both string and list as // coordinator console can not send array of strings in the update request. // See https://github.com/druid-io/druid/issues/3055 @JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist, - @JsonProperty("killAllDataSources") boolean killAllDataSources + @JsonProperty("killAllDataSources") Boolean killAllDataSources + ) + { + CoordinatorDynamicConfig current = configManager.watch( + CoordinatorDynamicConfig.CONFIG_KEY, + CoordinatorDynamicConfig.class + ).get(); + if (current == null) { + current = new Builder().build(); + } + + this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting == null + ? current.getMillisToWaitBeforeDeleting() + : millisToWaitBeforeDeleting; + this.mergeBytesLimit = mergeBytesLimit == null ? current.getMergeBytesLimit() : mergeBytesLimit; + this.mergeSegmentsLimit = mergeSegmentsLimit == null ? current.getMergeSegmentsLimit() : mergeSegmentsLimit; + this.maxSegmentsToMove = maxSegmentsToMove == null ? current.getMaxSegmentsToMove() : maxSegmentsToMove; + this.replicantLifetime = replicantLifetime == null ? current.getReplicantLifetime() : replicantLifetime; + this.replicationThrottleLimit = replicationThrottleLimit == null + ? current.getReplicationThrottleLimit() + : replicationThrottleLimit; + this.balancerComputeThreads = Math.max( + balancerComputeThreads == null + ? current.getBalancerComputeThreads() + : balancerComputeThreads, 1 + ); + this.emitBalancingStats = emitBalancingStats == null ? current.emitBalancingStats() : emitBalancingStats; + + + this.killAllDataSources = killAllDataSources == null ? current.isKillAllDataSources() : killAllDataSources; + this.killDataSourceWhitelist = killDataSourceWhitelist == null + ? current.getKillDataSourceWhitelist() + : parseKillDataSourceWhitelist(killDataSourceWhitelist); + + if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) { + throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist"); + } + } + + public CoordinatorDynamicConfig( + long millisToWaitBeforeDeleting, + long mergeBytesLimit, + int mergeSegmentsLimit, + int maxSegmentsToMove, + int replicantLifetime, + int replicationThrottleLimit, + int balancerComputeThreads, + boolean emitBalancingStats, + Object killDataSourceWhitelist, + boolean killAllDataSources ) { this.maxSegmentsToMove = maxSegmentsToMove; @@ -69,29 +120,33 @@ public class CoordinatorDynamicConfig this.replicationThrottleLimit = replicationThrottleLimit; this.emitBalancingStats = emitBalancingStats; this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); - this.killAllDataSources = killAllDataSources; - - if (killDataSourceWhitelist instanceof String) { - String[] tmp = ((String) killDataSourceWhitelist).split(","); - this.killDataSourceWhitelist = new HashSet<>(); - for (int i = 0; i < tmp.length; i++) { - String trimmed = tmp[i].trim(); - if (!trimmed.isEmpty()) { - this.killDataSourceWhitelist.add(trimmed); - } - } - } else if (killDataSourceWhitelist instanceof Collection){ - this.killDataSourceWhitelist = ImmutableSet.copyOf(((Collection) killDataSourceWhitelist)); - } else { - this.killDataSourceWhitelist = ImmutableSet.of(); - } + this.killDataSourceWhitelist = parseKillDataSourceWhitelist(killDataSourceWhitelist); if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) { throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist"); } } + private Set parseKillDataSourceWhitelist(Object killDataSourceWhitelist) + { + if (killDataSourceWhitelist instanceof String) { + String[] tmp = ((String) killDataSourceWhitelist).split(","); + Set result = new HashSet<>(); + for (int i = 0; i < tmp.length; i++) { + String trimmed = tmp[i].trim(); + if (!trimmed.isEmpty()) { + result.add(trimmed); + } + } + return result; + } else if (killDataSourceWhitelist instanceof Collection) { + return ImmutableSet.copyOf(((Collection) killDataSourceWhitelist)); + } else { + return ImmutableSet.of(); + } + } + @JsonProperty public long getMillisToWaitBeforeDeleting() { diff --git a/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java index e81549b6595..cf6f022c2aa 100644 --- a/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java @@ -19,20 +19,44 @@ package io.druid.server.http; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; - +import io.druid.common.config.JacksonConfigManager; +import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.IAE; -import io.druid.segment.TestHelper; import io.druid.server.coordinator.CoordinatorDynamicConfig; +import org.easymock.EasyMock; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.util.concurrent.atomic.AtomicReference; + /** */ public class CoordinatorDynamicConfigTest { + private JacksonConfigManager configManager; + private ObjectMapper mapper; + + @Before + public void setup() + { + mapper = new DefaultObjectMapper(); + configManager = EasyMock.mock(JacksonConfigManager.class); + EasyMock.expect( + configManager.watch( + CoordinatorDynamicConfig.CONFIG_KEY, + CoordinatorDynamicConfig.class + ) + ).andReturn(new AtomicReference<>(null)).anyTimes(); + EasyMock.replay(configManager); + InjectableValues inject = new InjectableValues.Std().addValue(JacksonConfigManager.class, configManager); + mapper.setInjectableValues(inject); + } + @Test public void testSerde() throws Exception { @@ -48,7 +72,6 @@ public class CoordinatorDynamicConfigTest + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"]\n" + "}\n"; - ObjectMapper mapper = TestHelper.getObjectMapper(); CoordinatorDynamicConfig actual = mapper.readValue( mapper.writeValueAsString( mapper.readValue( @@ -80,7 +103,6 @@ public class CoordinatorDynamicConfigTest + " \"killDataSourceWhitelist\": \" test1 ,test2 \"\n" + "}\n"; - ObjectMapper mapper = TestHelper.getObjectMapper(); CoordinatorDynamicConfig actual = mapper.readValue( mapper.writeValueAsString( mapper.readValue( @@ -112,7 +134,6 @@ public class CoordinatorDynamicConfigTest + " \"killAllDataSources\": true\n" + "}\n"; - ObjectMapper mapper = TestHelper.getObjectMapper(); CoordinatorDynamicConfig actual = mapper.readValue( mapper.writeValueAsString( mapper.readValue( @@ -156,6 +177,21 @@ public class CoordinatorDynamicConfigTest ); } + @Test + public void testUpdate() + { + CoordinatorDynamicConfig current = new CoordinatorDynamicConfig(99, 99, 99, 99, 99, 99, 99, true, ImmutableSet.of("x"), false); + JacksonConfigManager mock = EasyMock.mock(JacksonConfigManager.class); + EasyMock.expect(mock.watch(CoordinatorDynamicConfig.CONFIG_KEY, CoordinatorDynamicConfig.class)).andReturn( + new AtomicReference<>(current) + ); + EasyMock.replay(mock); + Assert.assertEquals( + current, + new CoordinatorDynamicConfig(mock, null, null, null, null, null, null, null, null, null, null) + ); + } + @Test public void testEqualsAndHashCodeSanity() {