mirror of https://github.com/apache/druid.git
coordinator dynamic config POST to update only explicitly specified fields (#4141)
* coordinator dynamic config POST to update only explicitly specified fields instead of resetting everything else to zeros * address review comments
This commit is contained in:
parent
54463941b9
commit
9b9e1cfecb
|
@ -18,10 +18,11 @@
|
|||
*/
|
||||
package io.druid.server.coordinator;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import io.druid.common.config.JacksonConfigManager;
|
||||
import io.druid.java.util.common.IAE;
|
||||
|
||||
import java.util.Collection;
|
||||
|
@ -45,20 +46,70 @@ public class CoordinatorDynamicConfig
|
|||
|
||||
@JsonCreator
|
||||
public CoordinatorDynamicConfig(
|
||||
@JsonProperty("millisToWaitBeforeDeleting") long millisToWaitBeforeDeleting,
|
||||
@JsonProperty("mergeBytesLimit") long mergeBytesLimit,
|
||||
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
|
||||
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
|
||||
@JsonProperty("replicantLifetime") int replicantLifetime,
|
||||
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
|
||||
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
|
||||
@JsonProperty("emitBalancingStats") boolean emitBalancingStats,
|
||||
@JacksonInject JacksonConfigManager configManager,
|
||||
@JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting,
|
||||
@JsonProperty("mergeBytesLimit") Long mergeBytesLimit,
|
||||
@JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit,
|
||||
@JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove,
|
||||
@JsonProperty("replicantLifetime") Integer replicantLifetime,
|
||||
@JsonProperty("replicationThrottleLimit") Integer replicationThrottleLimit,
|
||||
@JsonProperty("balancerComputeThreads") Integer balancerComputeThreads,
|
||||
@JsonProperty("emitBalancingStats") Boolean emitBalancingStats,
|
||||
|
||||
// Type is Object here so that we can support both string and list as
|
||||
// coordinator console can not send array of strings in the update request.
|
||||
// See https://github.com/druid-io/druid/issues/3055
|
||||
@JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist,
|
||||
@JsonProperty("killAllDataSources") boolean killAllDataSources
|
||||
@JsonProperty("killAllDataSources") Boolean killAllDataSources
|
||||
)
|
||||
{
|
||||
CoordinatorDynamicConfig current = configManager.watch(
|
||||
CoordinatorDynamicConfig.CONFIG_KEY,
|
||||
CoordinatorDynamicConfig.class
|
||||
).get();
|
||||
if (current == null) {
|
||||
current = new Builder().build();
|
||||
}
|
||||
|
||||
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting == null
|
||||
? current.getMillisToWaitBeforeDeleting()
|
||||
: millisToWaitBeforeDeleting;
|
||||
this.mergeBytesLimit = mergeBytesLimit == null ? current.getMergeBytesLimit() : mergeBytesLimit;
|
||||
this.mergeSegmentsLimit = mergeSegmentsLimit == null ? current.getMergeSegmentsLimit() : mergeSegmentsLimit;
|
||||
this.maxSegmentsToMove = maxSegmentsToMove == null ? current.getMaxSegmentsToMove() : maxSegmentsToMove;
|
||||
this.replicantLifetime = replicantLifetime == null ? current.getReplicantLifetime() : replicantLifetime;
|
||||
this.replicationThrottleLimit = replicationThrottleLimit == null
|
||||
? current.getReplicationThrottleLimit()
|
||||
: replicationThrottleLimit;
|
||||
this.balancerComputeThreads = Math.max(
|
||||
balancerComputeThreads == null
|
||||
? current.getBalancerComputeThreads()
|
||||
: balancerComputeThreads, 1
|
||||
);
|
||||
this.emitBalancingStats = emitBalancingStats == null ? current.emitBalancingStats() : emitBalancingStats;
|
||||
|
||||
|
||||
this.killAllDataSources = killAllDataSources == null ? current.isKillAllDataSources() : killAllDataSources;
|
||||
this.killDataSourceWhitelist = killDataSourceWhitelist == null
|
||||
? current.getKillDataSourceWhitelist()
|
||||
: parseKillDataSourceWhitelist(killDataSourceWhitelist);
|
||||
|
||||
if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) {
|
||||
throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
|
||||
}
|
||||
}
|
||||
|
||||
public CoordinatorDynamicConfig(
|
||||
long millisToWaitBeforeDeleting,
|
||||
long mergeBytesLimit,
|
||||
int mergeSegmentsLimit,
|
||||
int maxSegmentsToMove,
|
||||
int replicantLifetime,
|
||||
int replicationThrottleLimit,
|
||||
int balancerComputeThreads,
|
||||
boolean emitBalancingStats,
|
||||
Object killDataSourceWhitelist,
|
||||
boolean killAllDataSources
|
||||
)
|
||||
{
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
|
@ -69,29 +120,33 @@ public class CoordinatorDynamicConfig
|
|||
this.replicationThrottleLimit = replicationThrottleLimit;
|
||||
this.emitBalancingStats = emitBalancingStats;
|
||||
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
|
||||
|
||||
this.killAllDataSources = killAllDataSources;
|
||||
|
||||
if (killDataSourceWhitelist instanceof String) {
|
||||
String[] tmp = ((String) killDataSourceWhitelist).split(",");
|
||||
this.killDataSourceWhitelist = new HashSet<>();
|
||||
for (int i = 0; i < tmp.length; i++) {
|
||||
String trimmed = tmp[i].trim();
|
||||
if (!trimmed.isEmpty()) {
|
||||
this.killDataSourceWhitelist.add(trimmed);
|
||||
}
|
||||
}
|
||||
} else if (killDataSourceWhitelist instanceof Collection){
|
||||
this.killDataSourceWhitelist = ImmutableSet.copyOf(((Collection) killDataSourceWhitelist));
|
||||
} else {
|
||||
this.killDataSourceWhitelist = ImmutableSet.of();
|
||||
}
|
||||
this.killDataSourceWhitelist = parseKillDataSourceWhitelist(killDataSourceWhitelist);
|
||||
|
||||
if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) {
|
||||
throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
|
||||
}
|
||||
}
|
||||
|
||||
private Set<String> parseKillDataSourceWhitelist(Object killDataSourceWhitelist)
|
||||
{
|
||||
if (killDataSourceWhitelist instanceof String) {
|
||||
String[] tmp = ((String) killDataSourceWhitelist).split(",");
|
||||
Set<String> result = new HashSet<>();
|
||||
for (int i = 0; i < tmp.length; i++) {
|
||||
String trimmed = tmp[i].trim();
|
||||
if (!trimmed.isEmpty()) {
|
||||
result.add(trimmed);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
} else if (killDataSourceWhitelist instanceof Collection) {
|
||||
return ImmutableSet.copyOf(((Collection) killDataSourceWhitelist));
|
||||
} else {
|
||||
return ImmutableSet.of();
|
||||
}
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long getMillisToWaitBeforeDeleting()
|
||||
{
|
||||
|
|
|
@ -19,20 +19,44 @@
|
|||
|
||||
package io.druid.server.http;
|
||||
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import io.druid.common.config.JacksonConfigManager;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.server.coordinator.CoordinatorDynamicConfig;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class CoordinatorDynamicConfigTest
|
||||
{
|
||||
private JacksonConfigManager configManager;
|
||||
private ObjectMapper mapper;
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
mapper = new DefaultObjectMapper();
|
||||
configManager = EasyMock.mock(JacksonConfigManager.class);
|
||||
EasyMock.expect(
|
||||
configManager.watch(
|
||||
CoordinatorDynamicConfig.CONFIG_KEY,
|
||||
CoordinatorDynamicConfig.class
|
||||
)
|
||||
).andReturn(new AtomicReference<>(null)).anyTimes();
|
||||
EasyMock.replay(configManager);
|
||||
InjectableValues inject = new InjectableValues.Std().addValue(JacksonConfigManager.class, configManager);
|
||||
mapper.setInjectableValues(inject);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
|
@ -48,7 +72,6 @@ public class CoordinatorDynamicConfigTest
|
|||
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"]\n"
|
||||
+ "}\n";
|
||||
|
||||
ObjectMapper mapper = TestHelper.getObjectMapper();
|
||||
CoordinatorDynamicConfig actual = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(
|
||||
|
@ -80,7 +103,6 @@ public class CoordinatorDynamicConfigTest
|
|||
+ " \"killDataSourceWhitelist\": \" test1 ,test2 \"\n"
|
||||
+ "}\n";
|
||||
|
||||
ObjectMapper mapper = TestHelper.getObjectMapper();
|
||||
CoordinatorDynamicConfig actual = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(
|
||||
|
@ -112,7 +134,6 @@ public class CoordinatorDynamicConfigTest
|
|||
+ " \"killAllDataSources\": true\n"
|
||||
+ "}\n";
|
||||
|
||||
ObjectMapper mapper = TestHelper.getObjectMapper();
|
||||
CoordinatorDynamicConfig actual = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(
|
||||
|
@ -156,6 +177,21 @@ public class CoordinatorDynamicConfigTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdate()
|
||||
{
|
||||
CoordinatorDynamicConfig current = new CoordinatorDynamicConfig(99, 99, 99, 99, 99, 99, 99, true, ImmutableSet.of("x"), false);
|
||||
JacksonConfigManager mock = EasyMock.mock(JacksonConfigManager.class);
|
||||
EasyMock.expect(mock.watch(CoordinatorDynamicConfig.CONFIG_KEY, CoordinatorDynamicConfig.class)).andReturn(
|
||||
new AtomicReference<>(current)
|
||||
);
|
||||
EasyMock.replay(mock);
|
||||
Assert.assertEquals(
|
||||
current,
|
||||
new CoordinatorDynamicConfig(mock, null, null, null, null, null, null, null, null, null, null)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEqualsAndHashCodeSanity()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue