From 68242950f7ff86efee8db2696666d07b30308351 Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 30 Apr 2014 10:11:25 -0700 Subject: [PATCH] make coordinator routing smarter and period load rule explicit --- pom.xml | 2 +- .../coordinator/rules/PeriodLoadRule.java | 2 +- .../server/router/CoordinatorRuleManager.java | 26 +++++++++++++------ .../coordinator/rules/PeriodLoadRuleTest.java | 4 +-- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index 87bf8b6228c..b51fbb317ef 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ com.metamx http-client - 0.9.2 + 0.9.4 com.metamx diff --git a/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java index e996fa9e464..3f650c899bf 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java @@ -94,6 +94,6 @@ public class PeriodLoadRule extends LoadRule public boolean appliesTo(Interval interval, DateTime referenceTimestamp) { final Interval currInterval = new Interval(period, referenceTimestamp); - return currInterval.overlaps(interval); + return currInterval.contains(interval); } } diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java index adf8e6fc48e..bcd5ba94176 100644 --- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -30,8 +30,8 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; +import com.metamx.http.client.response.FullResponseHandler; +import com.metamx.http.client.response.FullResponseHolder; import io.druid.client.selector.Server; import io.druid.concurrent.Execs; import io.druid.curator.discovery.ServerDiscoverySelector; @@ -39,6 +39,7 @@ import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; import io.druid.server.coordinator.rules.Rule; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Duration; import java.net.URL; @@ -60,7 +61,7 @@ public class CoordinatorRuleManager private final Supplier config; private final ServerDiscoverySelector selector; - private final StatusResponseHandler responseHandler; + private final FullResponseHandler responseHandler; private final AtomicReference>> rules; private volatile ScheduledExecutorService exec; @@ -82,7 +83,7 @@ public class CoordinatorRuleManager this.config = config; this.selector = selector; - this.responseHandler = new StatusResponseHandler(Charsets.UTF_8); + this.responseHandler = new FullResponseHandler(Charsets.UTF_8); this.rules = new AtomicReference<>( new ConcurrentHashMap>() ); @@ -137,6 +138,7 @@ public class CoordinatorRuleManager return started; } + @SuppressWarnings("unchecked") public void poll() { try { @@ -145,11 +147,19 @@ public class CoordinatorRuleManager return; } - StatusResponseHolder response = httpClient.get(new URL(url)) - .go(responseHandler) - .get(); + FullResponseHolder response = httpClient.get(new URL(url)) + .go(responseHandler) + .get(); - ConcurrentHashMap> newRules = new ConcurrentHashMap>( + if (response.getStatus().equals(HttpResponseStatus.FOUND)) { + url = response.getResponse().getHeader("Location"); + log.info("Redirecting rule request to [%s]", url); + response = httpClient.get(new URL(url)) + .go(responseHandler) + .get(); + } + + ConcurrentHashMap> newRules = new ConcurrentHashMap<>( (Map>) jsonMapper.readValue( response.getContent(), new TypeReference>>() { diff --git a/server/src/test/java/io/druid/server/coordinator/rules/PeriodLoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/PeriodLoadRuleTest.java index afd255111eb..63d33844daa 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/PeriodLoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/PeriodLoadRuleTest.java @@ -49,7 +49,7 @@ public class PeriodLoadRuleTest Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("2012-01-01/2012-12-31")).build(), now)); Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("1000-01-01/2012-12-31")).build(), now)); - Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build(), now)); + Assert.assertFalse(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build(), now)); } @Test @@ -64,7 +64,7 @@ public class PeriodLoadRuleTest ); Assert.assertTrue(rule.appliesTo(builder.interval(new Interval(now.minusWeeks(1), now)).build(), now)); - Assert.assertTrue( + Assert.assertFalse( rule.appliesTo( builder.interval(new Interval(now.minusDays(1), now.plusDays(1))) .build(),