mirror of https://github.com/apache/druid.git
re-implement updation of CoordinatorDynamicConfig to not call manager.watch(..) in its constructor (#4557)
* re-implement updation of CoordinatorDynamicConfig to not call manager.watch(..) in its constructor * address review comments
This commit is contained in:
parent
60cdf94677
commit
a862cc716f
|
@ -18,16 +18,15 @@
|
|||
*/
|
||||
package io.druid.server.coordinator;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import io.druid.common.config.JacksonConfigManager;
|
||||
import io.druid.java.util.common.IAE;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class CoordinatorDynamicConfig
|
||||
{
|
||||
|
@ -53,84 +52,31 @@ public class CoordinatorDynamicConfig
|
|||
|
||||
@JsonCreator
|
||||
public CoordinatorDynamicConfig(
|
||||
@JacksonInject JacksonConfigManager configManager,
|
||||
@JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting,
|
||||
@JsonProperty("mergeBytesLimit") Long mergeBytesLimit,
|
||||
@JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit,
|
||||
@JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove,
|
||||
@JsonProperty("replicantLifetime") Integer replicantLifetime,
|
||||
@JsonProperty("replicationThrottleLimit") Integer replicationThrottleLimit,
|
||||
@JsonProperty("balancerComputeThreads") Integer balancerComputeThreads,
|
||||
@JsonProperty("emitBalancingStats") Boolean emitBalancingStats,
|
||||
@JsonProperty("millisToWaitBeforeDeleting") long millisToWaitBeforeDeleting,
|
||||
@JsonProperty("mergeBytesLimit") long mergeBytesLimit,
|
||||
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
|
||||
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
|
||||
@JsonProperty("replicantLifetime") int replicantLifetime,
|
||||
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
|
||||
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
|
||||
@JsonProperty("emitBalancingStats") boolean emitBalancingStats,
|
||||
|
||||
// Type is Object here so that we can support both string and list as
|
||||
// coordinator console can not send array of strings in the update request.
|
||||
// See https://github.com/druid-io/druid/issues/3055
|
||||
@JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist,
|
||||
@JsonProperty("killAllDataSources") Boolean killAllDataSources,
|
||||
@JsonProperty("maxSegmentsInNodeLoadingQueue") Integer maxSegmentsInNodeLoadingQueue
|
||||
@JsonProperty("killAllDataSources") boolean killAllDataSources,
|
||||
@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue
|
||||
)
|
||||
{
|
||||
CoordinatorDynamicConfig current = configManager.watch(
|
||||
CoordinatorDynamicConfig.CONFIG_KEY,
|
||||
CoordinatorDynamicConfig.class
|
||||
).get();
|
||||
if (current == null) {
|
||||
current = new Builder().build();
|
||||
}
|
||||
|
||||
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting == null
|
||||
? current.getMillisToWaitBeforeDeleting()
|
||||
: millisToWaitBeforeDeleting;
|
||||
this.mergeBytesLimit = mergeBytesLimit == null ? current.getMergeBytesLimit() : mergeBytesLimit;
|
||||
this.mergeSegmentsLimit = mergeSegmentsLimit == null ? current.getMergeSegmentsLimit() : mergeSegmentsLimit;
|
||||
this.maxSegmentsToMove = maxSegmentsToMove == null ? current.getMaxSegmentsToMove() : maxSegmentsToMove;
|
||||
this.replicantLifetime = replicantLifetime == null ? current.getReplicantLifetime() : replicantLifetime;
|
||||
this.replicationThrottleLimit = replicationThrottleLimit == null
|
||||
? current.getReplicationThrottleLimit()
|
||||
: replicationThrottleLimit;
|
||||
this.balancerComputeThreads = Math.max(
|
||||
balancerComputeThreads == null
|
||||
? current.getBalancerComputeThreads()
|
||||
: balancerComputeThreads, 1
|
||||
);
|
||||
this.emitBalancingStats = emitBalancingStats == null ? current.emitBalancingStats() : emitBalancingStats;
|
||||
|
||||
|
||||
this.killAllDataSources = killAllDataSources == null ? current.isKillAllDataSources() : killAllDataSources;
|
||||
this.killDataSourceWhitelist = killDataSourceWhitelist == null
|
||||
? current.getKillDataSourceWhitelist()
|
||||
: parseKillDataSourceWhitelist(killDataSourceWhitelist);
|
||||
|
||||
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null ? current.getMaxSegmentsInNodeLoadingQueue() : maxSegmentsInNodeLoadingQueue;
|
||||
|
||||
if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) {
|
||||
throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
|
||||
}
|
||||
}
|
||||
|
||||
private CoordinatorDynamicConfig(
|
||||
long millisToWaitBeforeDeleting,
|
||||
long mergeBytesLimit,
|
||||
int mergeSegmentsLimit,
|
||||
int maxSegmentsToMove,
|
||||
int replicantLifetime,
|
||||
int replicationThrottleLimit,
|
||||
int balancerComputeThreads,
|
||||
boolean emitBalancingStats,
|
||||
Object killDataSourceWhitelist,
|
||||
boolean killAllDataSources,
|
||||
int maxSegmentsInNodeLoadingQueue
|
||||
)
|
||||
{
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
|
||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||
this.mergeBytesLimit = mergeBytesLimit;
|
||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
this.replicantLifetime = replicantLifetime;
|
||||
this.replicationThrottleLimit = replicationThrottleLimit;
|
||||
this.emitBalancingStats = emitBalancingStats;
|
||||
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
|
||||
this.emitBalancingStats = emitBalancingStats;
|
||||
this.killAllDataSources = killAllDataSources;
|
||||
this.killDataSourceWhitelist = parseKillDataSourceWhitelist(killDataSourceWhitelist);
|
||||
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
|
||||
|
@ -315,35 +261,46 @@ public class CoordinatorDynamicConfig
|
|||
|
||||
public static class Builder
|
||||
{
|
||||
private long millisToWaitBeforeDeleting;
|
||||
private long mergeBytesLimit;
|
||||
private int mergeSegmentsLimit;
|
||||
private int maxSegmentsToMove;
|
||||
private int replicantLifetime;
|
||||
private int replicationThrottleLimit;
|
||||
private boolean emitBalancingStats;
|
||||
private int balancerComputeThreads;
|
||||
private Set<String> killDataSourceWhitelist;
|
||||
private boolean killAllDataSources;
|
||||
private int maxSegmentsInNodeLoadingQueue;
|
||||
private final static long DEFAULT_MILLIS_TO_WAIT_BEFORE_DELETING = TimeUnit.MINUTES.toMillis(15);
|
||||
private final static long DEFAULT_MERGE_BYTES_LIMIT = 524288000L;
|
||||
private final static int DEFAULT_MERGE_SEGMENTS_LIMIT = 100;
|
||||
private final static int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5;
|
||||
private final static int DEFAULT_REPLICANT_LIFETIME = 15;
|
||||
private final static int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10;
|
||||
private final static int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
|
||||
private final static boolean DEFAULT_EMIT_BALANCING_STATS = false;
|
||||
private final static boolean DEFAULT_KILL_ALL_DATA_SOURCES = false;
|
||||
private final static int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
|
||||
|
||||
private Long millisToWaitBeforeDeleting;
|
||||
private Long mergeBytesLimit;
|
||||
private Integer mergeSegmentsLimit;
|
||||
private Integer maxSegmentsToMove;
|
||||
private Integer replicantLifetime;
|
||||
private Integer replicationThrottleLimit;
|
||||
private Boolean emitBalancingStats;
|
||||
private Integer balancerComputeThreads;
|
||||
private Object killDataSourceWhitelist;
|
||||
private Boolean killAllDataSources;
|
||||
private Integer maxSegmentsInNodeLoadingQueue;
|
||||
|
||||
public Builder()
|
||||
{
|
||||
this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null, false, 0);
|
||||
}
|
||||
|
||||
private Builder(
|
||||
long millisToWaitBeforeDeleting,
|
||||
long mergeBytesLimit,
|
||||
int mergeSegmentsLimit,
|
||||
int maxSegmentsToMove,
|
||||
int replicantLifetime,
|
||||
int replicationThrottleLimit,
|
||||
int balancerComputeThreads,
|
||||
boolean emitBalancingStats,
|
||||
Set<String> killDataSourceWhitelist,
|
||||
boolean killAllDataSources,
|
||||
int maxSegmentsInNodeLoadingQueue
|
||||
@JsonCreator
|
||||
public Builder(
|
||||
@JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting,
|
||||
@JsonProperty("mergeBytesLimit") Long mergeBytesLimit,
|
||||
@JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit,
|
||||
@JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove,
|
||||
@JsonProperty("replicantLifetime") Integer replicantLifetime,
|
||||
@JsonProperty("replicationThrottleLimit") Integer replicationThrottleLimit,
|
||||
@JsonProperty("balancerComputeThreads") Integer balancerComputeThreads,
|
||||
@JsonProperty("emitBalancingStats") Boolean emitBalancingStats,
|
||||
@JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist,
|
||||
@JsonProperty("killAllDataSources") Boolean killAllDataSources,
|
||||
@JsonProperty("maxSegmentsInNodeLoadingQueue") Integer maxSegmentsInNodeLoadingQueue
|
||||
)
|
||||
{
|
||||
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
|
||||
|
@ -352,10 +309,10 @@ public class CoordinatorDynamicConfig
|
|||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
this.replicantLifetime = replicantLifetime;
|
||||
this.replicationThrottleLimit = replicationThrottleLimit;
|
||||
this.emitBalancingStats = emitBalancingStats;
|
||||
this.balancerComputeThreads = balancerComputeThreads;
|
||||
this.killDataSourceWhitelist = killDataSourceWhitelist;
|
||||
this.emitBalancingStats = emitBalancingStats;
|
||||
this.killAllDataSources = killAllDataSources;
|
||||
this.killDataSourceWhitelist = killDataSourceWhitelist;
|
||||
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
|
||||
}
|
||||
|
||||
|
@ -425,20 +382,40 @@ public class CoordinatorDynamicConfig
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public CoordinatorDynamicConfig build()
|
||||
{
|
||||
return new CoordinatorDynamicConfig(
|
||||
millisToWaitBeforeDeleting,
|
||||
mergeBytesLimit,
|
||||
mergeSegmentsLimit,
|
||||
maxSegmentsToMove,
|
||||
replicantLifetime,
|
||||
replicationThrottleLimit,
|
||||
balancerComputeThreads,
|
||||
emitBalancingStats,
|
||||
millisToWaitBeforeDeleting == null ? DEFAULT_MILLIS_TO_WAIT_BEFORE_DELETING : millisToWaitBeforeDeleting,
|
||||
mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : mergeBytesLimit,
|
||||
mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : mergeSegmentsLimit,
|
||||
maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove,
|
||||
replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime,
|
||||
replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
|
||||
balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads,
|
||||
emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : emitBalancingStats,
|
||||
killDataSourceWhitelist,
|
||||
killAllDataSources,
|
||||
maxSegmentsInNodeLoadingQueue
|
||||
killAllDataSources == null ? DEFAULT_KILL_ALL_DATA_SOURCES : killAllDataSources,
|
||||
maxSegmentsInNodeLoadingQueue == null ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE : maxSegmentsInNodeLoadingQueue
|
||||
);
|
||||
}
|
||||
|
||||
public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
|
||||
{
|
||||
return new CoordinatorDynamicConfig(
|
||||
millisToWaitBeforeDeleting == null ? defaults.getMillisToWaitBeforeDeleting() : millisToWaitBeforeDeleting,
|
||||
mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit,
|
||||
mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit,
|
||||
maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove,
|
||||
replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime,
|
||||
replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
|
||||
balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads,
|
||||
emitBalancingStats == null ? defaults.emitBalancingStats() : emitBalancingStats,
|
||||
killDataSourceWhitelist == null ? defaults.getKillDataSourceWhitelist() : killDataSourceWhitelist,
|
||||
killAllDataSources == null ? defaults.isKillAllDataSources() : killAllDataSources,
|
||||
maxSegmentsInNodeLoadingQueue == null ? defaults.getMaxSegmentsInNodeLoadingQueue() : maxSegmentsInNodeLoadingQueue
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,15 +76,20 @@ public class CoordinatorDynamicConfigsResource
|
|||
// default value is used for backwards compatibility
|
||||
@POST
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
public Response setDynamicConfigs(final CoordinatorDynamicConfig dynamicConfig,
|
||||
public Response setDynamicConfigs(final CoordinatorDynamicConfig.Builder dynamicConfigBuilder,
|
||||
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
|
||||
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment,
|
||||
@Context HttpServletRequest req
|
||||
)
|
||||
{
|
||||
CoordinatorDynamicConfig current = manager.watch(
|
||||
CoordinatorDynamicConfig.CONFIG_KEY,
|
||||
CoordinatorDynamicConfig.class
|
||||
).get();
|
||||
|
||||
if (!manager.set(
|
||||
CoordinatorDynamicConfig.CONFIG_KEY,
|
||||
dynamicConfig,
|
||||
current == null ? dynamicConfigBuilder.build() : dynamicConfigBuilder.build(current),
|
||||
new AuditInfo(author, comment, req.getRemoteAddr())
|
||||
)) {
|
||||
return Response.status(Response.Status.BAD_REQUEST).build();
|
||||
|
|
|
@ -19,44 +19,22 @@
|
|||
|
||||
package io.druid.server.http;
|
||||
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import io.druid.common.config.JacksonConfigManager;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.server.coordinator.CoordinatorDynamicConfig;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class CoordinatorDynamicConfigTest
|
||||
{
|
||||
private JacksonConfigManager configManager;
|
||||
private ObjectMapper mapper;
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
mapper = new DefaultObjectMapper();
|
||||
configManager = EasyMock.mock(JacksonConfigManager.class);
|
||||
EasyMock.expect(
|
||||
configManager.watch(
|
||||
CoordinatorDynamicConfig.CONFIG_KEY,
|
||||
CoordinatorDynamicConfig.class
|
||||
)
|
||||
).andReturn(new AtomicReference<>(null)).anyTimes();
|
||||
EasyMock.replay(configManager);
|
||||
InjectableValues inject = new InjectableValues.Std().addValue(JacksonConfigManager.class, configManager);
|
||||
mapper.setInjectableValues(inject);
|
||||
}
|
||||
private final ObjectMapper mapper = TestHelper.getJsonMapper();
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
|
@ -201,14 +179,10 @@ public class CoordinatorDynamicConfigTest
|
|||
CoordinatorDynamicConfig current = CoordinatorDynamicConfig.builder()
|
||||
.withKillDataSourceWhitelist(ImmutableSet.of("x"))
|
||||
.build();
|
||||
JacksonConfigManager mock = EasyMock.mock(JacksonConfigManager.class);
|
||||
EasyMock.expect(mock.watch(CoordinatorDynamicConfig.CONFIG_KEY, CoordinatorDynamicConfig.class)).andReturn(
|
||||
new AtomicReference<>(current)
|
||||
);
|
||||
EasyMock.replay(mock);
|
||||
|
||||
Assert.assertEquals(
|
||||
current,
|
||||
new CoordinatorDynamicConfig(mock, null, null, null, null, null, null, null, null, null, null, null)
|
||||
new CoordinatorDynamicConfig.Builder(null, null, null, null, null, null, null, null, null, null, null).build(current)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue