Merge pull request #524 from metamx/fix-router2

Small fixes to router
This commit is contained in:
fjy 2014-04-30 11:21:45 -06:00
commit 159c3361b8
4 changed files with 22 additions and 12 deletions

View File

@ -79,7 +79,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.9.2</version>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -94,6 +94,6 @@ public class PeriodLoadRule extends LoadRule
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
{
final Interval currInterval = new Interval(period, referenceTimestamp);
return currInterval.overlaps(interval);
return currInterval.contains(interval);
}
}

View File

@ -30,8 +30,8 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import com.metamx.http.client.response.FullResponseHandler;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.client.selector.Server;
import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServerDiscoverySelector;
@ -39,6 +39,7 @@ import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.server.coordinator.rules.Rule;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration;
import java.net.URL;
@ -60,7 +61,7 @@ public class CoordinatorRuleManager
private final Supplier<TieredBrokerConfig> config;
private final ServerDiscoverySelector selector;
private final StatusResponseHandler responseHandler;
private final FullResponseHandler responseHandler;
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
private volatile ScheduledExecutorService exec;
@ -82,7 +83,7 @@ public class CoordinatorRuleManager
this.config = config;
this.selector = selector;
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
this.responseHandler = new FullResponseHandler(Charsets.UTF_8);
this.rules = new AtomicReference<>(
new ConcurrentHashMap<String, List<Rule>>()
);
@ -137,6 +138,7 @@ public class CoordinatorRuleManager
return started;
}
@SuppressWarnings("unchecked")
public void poll()
{
try {
@ -145,11 +147,19 @@ public class CoordinatorRuleManager
return;
}
StatusResponseHolder response = httpClient.get(new URL(url))
.go(responseHandler)
.get();
FullResponseHolder response = httpClient.get(new URL(url))
.go(responseHandler)
.get();
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<String, List<Rule>>(
if (response.getStatus().equals(HttpResponseStatus.FOUND)) {
url = response.getResponse().getHeader("Location");
log.info("Redirecting rule request to [%s]", url);
response = httpClient.get(new URL(url))
.go(responseHandler)
.get();
}
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<>(
(Map<String, List<Rule>>) jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, List<Rule>>>()
{

View File

@ -49,7 +49,7 @@ public class PeriodLoadRuleTest
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("2012-01-01/2012-12-31")).build(), now));
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("1000-01-01/2012-12-31")).build(), now));
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build(), now));
Assert.assertFalse(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build(), now));
}
@Test
@ -64,7 +64,7 @@ public class PeriodLoadRuleTest
);
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval(now.minusWeeks(1), now)).build(), now));
Assert.assertTrue(
Assert.assertFalse(
rule.appliesTo(
builder.interval(new Interval(now.minusDays(1), now.plusDays(1)))
.build(),