mirror of https://github.com/apache/druid.git
make coordinator routing smarter and period load rule explicit
This commit is contained in:
parent
ffd37bc7bd
commit
68242950f7
2
pom.xml
2
pom.xml
|
@ -79,7 +79,7 @@
|
|||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>http-client</artifactId>
|
||||
<version>0.9.2</version>
|
||||
<version>0.9.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
|
|
|
@ -94,6 +94,6 @@ public class PeriodLoadRule extends LoadRule
|
|||
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||
{
|
||||
final Interval currInterval = new Interval(period, referenceTimestamp);
|
||||
return currInterval.overlaps(interval);
|
||||
return currInterval.contains(interval);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,8 +30,8 @@ import com.metamx.common.lifecycle.LifecycleStart;
|
|||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.StatusResponseHandler;
|
||||
import com.metamx.http.client.response.StatusResponseHolder;
|
||||
import com.metamx.http.client.response.FullResponseHandler;
|
||||
import com.metamx.http.client.response.FullResponseHolder;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
|
@ -39,6 +39,7 @@ import io.druid.guice.ManageLifecycle;
|
|||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.server.coordinator.rules.Rule;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
import java.net.URL;
|
||||
|
@ -60,7 +61,7 @@ public class CoordinatorRuleManager
|
|||
private final Supplier<TieredBrokerConfig> config;
|
||||
private final ServerDiscoverySelector selector;
|
||||
|
||||
private final StatusResponseHandler responseHandler;
|
||||
private final FullResponseHandler responseHandler;
|
||||
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
|
||||
|
||||
private volatile ScheduledExecutorService exec;
|
||||
|
@ -82,7 +83,7 @@ public class CoordinatorRuleManager
|
|||
this.config = config;
|
||||
this.selector = selector;
|
||||
|
||||
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
|
||||
this.responseHandler = new FullResponseHandler(Charsets.UTF_8);
|
||||
this.rules = new AtomicReference<>(
|
||||
new ConcurrentHashMap<String, List<Rule>>()
|
||||
);
|
||||
|
@ -137,6 +138,7 @@ public class CoordinatorRuleManager
|
|||
return started;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void poll()
|
||||
{
|
||||
try {
|
||||
|
@ -145,11 +147,19 @@ public class CoordinatorRuleManager
|
|||
return;
|
||||
}
|
||||
|
||||
StatusResponseHolder response = httpClient.get(new URL(url))
|
||||
.go(responseHandler)
|
||||
.get();
|
||||
FullResponseHolder response = httpClient.get(new URL(url))
|
||||
.go(responseHandler)
|
||||
.get();
|
||||
|
||||
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<String, List<Rule>>(
|
||||
if (response.getStatus().equals(HttpResponseStatus.FOUND)) {
|
||||
url = response.getResponse().getHeader("Location");
|
||||
log.info("Redirecting rule request to [%s]", url);
|
||||
response = httpClient.get(new URL(url))
|
||||
.go(responseHandler)
|
||||
.get();
|
||||
}
|
||||
|
||||
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<>(
|
||||
(Map<String, List<Rule>>) jsonMapper.readValue(
|
||||
response.getContent(), new TypeReference<Map<String, List<Rule>>>()
|
||||
{
|
||||
|
|
|
@ -49,7 +49,7 @@ public class PeriodLoadRuleTest
|
|||
|
||||
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("2012-01-01/2012-12-31")).build(), now));
|
||||
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("1000-01-01/2012-12-31")).build(), now));
|
||||
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build(), now));
|
||||
Assert.assertFalse(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build(), now));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -64,7 +64,7 @@ public class PeriodLoadRuleTest
|
|||
);
|
||||
|
||||
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval(now.minusWeeks(1), now)).build(), now));
|
||||
Assert.assertTrue(
|
||||
Assert.assertFalse(
|
||||
rule.appliesTo(
|
||||
builder.interval(new Interval(now.minusDays(1), now.plusDays(1)))
|
||||
.build(),
|
||||
|
|
Loading…
Reference in New Issue