Add unit tests for CoordinatorRuleManager (#9318)

This commit is contained in:
Jihoon Son 2020-02-13 19:29:57 -08:00 committed by GitHub
parent e9aebd994a
commit bcf8f91e46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 222 additions and 26 deletions

View File

@ -28,6 +28,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Map;
import java.util.Objects;
/**
*/
@ -76,4 +77,23 @@ public class ForeverLoadRule extends LoadRule
{
return true;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ForeverLoadRule that = (ForeverLoadRule) o;
return Objects.equals(tieredReplicants, that.tieredReplicants);
}
@Override
public int hashCode()
{
return Objects.hash(tieredReplicants);
}
}

View File

@ -25,6 +25,8 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Objects;
/**
*/
public class IntervalDropRule extends DropRule
@ -63,4 +65,23 @@ public class IntervalDropRule extends DropRule
{
return interval.contains(theInterval);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IntervalDropRule that = (IntervalDropRule) o;
return Objects.equals(interval, that.interval);
}
@Override
public int hashCode()
{
return Objects.hash(interval);
}
}

View File

@ -21,7 +21,9 @@ package org.apache.druid.server.router;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.discovery.DruidLeaderClient;
@ -40,6 +42,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration;
import javax.annotation.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -48,25 +51,30 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
@ManageLifecycle
public class CoordinatorRuleManager
{
private static final Logger log = new Logger(CoordinatorRuleManager.class);
private static final Logger LOG = new Logger(CoordinatorRuleManager.class);
private static final TypeReference<Map<String, List<Rule>>> TYPE_REFERENCE =
new TypeReference<Map<String, List<Rule>>>()
{
};
private final ObjectMapper jsonMapper;
private final Supplier<TieredBrokerConfig> config;
private final AtomicReference<Map<String, List<Rule>>> rules;
private final DruidLeaderClient druidLeaderClient;
private volatile ScheduledExecutorService exec;
private final Object lock = new Object();
private volatile boolean started = false;
@GuardedBy("lock")
private ScheduledExecutorService exec;
@Inject
public CoordinatorRuleManager(
@Json ObjectMapper jsonMapper,
@ -78,9 +86,7 @@ public class CoordinatorRuleManager
this.config = config;
this.druidLeaderClient = druidLeaderClient;
this.rules = new AtomicReference<>(
Collections.emptyMap()
);
this.rules = new AtomicReference<>(Collections.emptyMap());
}
@LifecycleStart
@ -97,14 +103,7 @@ public class CoordinatorRuleManager
exec,
new Duration(0),
config.get().getPollPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
poll();
}
}
this::poll
);
started = true;
@ -132,7 +131,6 @@ public class CoordinatorRuleManager
return started;
}
@SuppressWarnings("unchecked")
public void poll()
{
try {
@ -148,16 +146,13 @@ public class CoordinatorRuleManager
);
}
rules.set(
Collections.unmodifiableMap(jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, List<Rule>>>()
{
}
))
);
final Map<String, List<Rule>> map = jsonMapper.readValue(response.getContent(), TYPE_REFERENCE);
final Map<String, List<Rule>> immutableMapBuilder = Maps.newHashMapWithExpectedSize(map.size());
map.forEach((k, list) -> immutableMapBuilder.put(k, Collections.unmodifiableList(list)));
rules.set(Collections.unmodifiableMap(immutableMapBuilder));
}
catch (Exception e) {
log.error(e, "Exception while polling for rules");
LOG.error(e, "Exception while polling for rules");
}
}
@ -175,4 +170,14 @@ public class CoordinatorRuleManager
}
return rulesWithDefault;
}
/**
* Returns the current snapshot of the rules.
* This method should be used for only testing.
*/
@VisibleForTesting
Map<String, List<Rule>> getRules()
{
return rules.get();
}
}

View File

@ -36,6 +36,7 @@ public class TieredBrokerConfig
{
public static final String DEFAULT_COORDINATOR_SERVICE_NAME = "druid/coordinator";
public static final String DEFAULT_BROKER_SERVICE_NAME = "druid/broker";
public static final String DEFAULT_RULE_NAME = "_default";
@JsonProperty
@NotNull
@ -46,7 +47,7 @@ public class TieredBrokerConfig
@JsonProperty
@NotNull
private String defaultRule = "_default";
private String defaultRule = DEFAULT_RULE_NAME;
@JsonProperty
@NotNull

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.coordinator.rules.ForeverDropRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalDropRule;
import org.apache.druid.server.coordinator.rules.PeriodLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class CoordinatorRuleManagerTest
{
private static final String DATASOURCE1 = "datasource1";
private static final String DATASOURCE2 = "datasource2";
private static final List<Rule> DEFAULT_RULES = ImmutableList.of(
new ForeverLoadRule(ImmutableMap.of("__default", 2))
);
@org.junit.Rule
public ExpectedException expectedException = ExpectedException.none();
private final ObjectMapper objectMapper = new DefaultObjectMapper();
private final TieredBrokerConfig tieredBrokerConfig = new TieredBrokerConfig();
@Test
public void testAddingToRulesMapThrowingError()
{
final CoordinatorRuleManager manager = new CoordinatorRuleManager(
objectMapper,
() -> tieredBrokerConfig,
mockClient()
);
final Map<String, List<Rule>> rules = manager.getRules();
expectedException.expect(UnsupportedOperationException.class);
rules.put("testKey", Collections.emptyList());
}
@Test
public void testAddingToRulesListThrowingError()
{
final CoordinatorRuleManager manager = new CoordinatorRuleManager(
objectMapper,
() -> tieredBrokerConfig,
mockClient()
);
manager.poll();
final Map<String, List<Rule>> rules = manager.getRules();
expectedException.expect(UnsupportedOperationException.class);
rules.get(DATASOURCE1).add(new ForeverDropRule());
}
@Test
public void testGetRulesWithUnknownDatasourceReturningDefaultRule()
{
final CoordinatorRuleManager manager = new CoordinatorRuleManager(
objectMapper,
() -> tieredBrokerConfig,
mockClient()
);
manager.poll();
final List<Rule> rules = manager.getRulesWithDefault("unknown");
Assert.assertEquals(DEFAULT_RULES, rules);
}
@Test
public void testGetRulesWithKnownDatasourceReturningAllRulesWithDefaultRule()
{
final CoordinatorRuleManager manager = new CoordinatorRuleManager(
objectMapper,
() -> tieredBrokerConfig,
mockClient()
);
manager.poll();
final List<Rule> rules = manager.getRulesWithDefault(DATASOURCE2);
final List<Rule> expectedRules = new ArrayList<>();
expectedRules.add(new ForeverLoadRule(null));
expectedRules.add(new IntervalDropRule(Intervals.of("2020-01-01/2020-01-02")));
expectedRules.addAll(DEFAULT_RULES);
Assert.assertEquals(expectedRules, rules);
}
private DruidLeaderClient mockClient()
{
final Map<String, List<Rule>> rules = ImmutableMap.of(
DATASOURCE1,
ImmutableList.of(new ForeverLoadRule(null)),
DATASOURCE2,
ImmutableList.of(new ForeverLoadRule(null), new IntervalDropRule(Intervals.of("2020-01-01/2020-01-02"))),
"datasource3",
ImmutableList.of(
new PeriodLoadRule(new Period("P1M"), true, null),
new ForeverDropRule()
),
TieredBrokerConfig.DEFAULT_RULE_NAME,
ImmutableList.of(new ForeverLoadRule(ImmutableMap.of("__default", 2)))
);
final StringFullResponseHolder holder = EasyMock.niceMock(StringFullResponseHolder.class);
EasyMock.expect(holder.getStatus())
.andReturn(HttpResponseStatus.OK);
try {
EasyMock.expect(holder.getContent())
.andReturn(objectMapper.writeValueAsString(rules));
final DruidLeaderClient client = EasyMock.niceMock(DruidLeaderClient.class);
EasyMock.expect(client.go(EasyMock.anyObject()))
.andReturn(holder);
EasyMock.replay(holder, client);
return client;
}
catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}