diff --git a/docs/content/development/router.md b/docs/content/development/router.md index 1435578a3a1..51619776878 100644 --- a/docs/content/development/router.md +++ b/docs/content/development/router.md @@ -182,3 +182,41 @@ Returns the dimensions of the datasource. * `/druid/v2/datasources/{dataSourceName}/metrics` Returns the metrics of the datasource. + +Router as Management Proxy +-------------------------- + +The router can be configured to forward requests to the active coordinator or overlord node. This may be useful for +setting up a highly available cluster in situations where the HTTP redirect mechanism of the inactive -> active +coordinator/overlord does not function correctly (servers are behind a load balancer, the hostname used in the redirect +is only resolvable internally, etc.). + +### Enabling the Management Proxy + +To enable this functionality, set the following in the router's runtime.properties: + +``` +druid.router.managementProxy.enabled=true +``` + +### Routing + +The management proxy supports implicit and explicit routes. Implicit routes are those where the destination can be +determined from the original request path based on Druid API path conventions. For the coordinator the convention is +`/druid/coordinator/*` and for the overlord the convention is `/druid/indexer/*`. These are convenient because they mean +that using the management proxy does not require modifying the API request other than issuing the request to the router +instead of the coordinator or overlord. Most Druid API requests can be routed implicitly. + +Explicit routes are those where the request to the router contains a path prefix indicating which node the request +should be routed to. For the coordinator this prefix is `/proxy/coordinator` and for the overlord it is `/proxy/overlord`. +This is required for API calls with an ambiguous destination. For example, the `/status` API is present on all Druid +nodes, so explicit routing needs to be used to indicate the proxy destination. + +This is summarized in the table below: + +|Request Route|Destination|Rewritten Route|Example| +|-------------|-----------|---------------|-------| +|`/druid/coordinator/*`|Coordinator|`/druid/coordinator/*`|`router:8888/druid/coordinator/v1/datasources` -> `coordinator:8081/druid/coordinator/v1/datasources`| +|`/druid/indexer/*`|Overlord|`/druid/indexer/*`|`router:8888/druid/indexer/v1/task` -> `overlord:8090/druid/indexer/v1/task`| +|`/proxy/coordinator/*`|Coordinator|`/*`|`router:8888/proxy/coordinator/status` -> `coordinator:8081/status`| +|`/proxy/overlord/*`|Overlord|`/*`|`router:8888/proxy/overlord/druid/indexer/v1/isLeader` -> `overlord:8090/druid/indexer/v1/isLeader`| diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java index 9b51a76580e..fe4667b9c2d 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -21,7 +21,6 @@ package io.druid.curator.discovery; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.concurrent.LifecycleLock; import io.druid.discovery.DruidLeaderSelector; import io.druid.guice.annotations.Self; @@ -29,6 +28,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.server.DruidNode; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; @@ -38,7 +38,6 @@ import org.apache.curator.framework.recipes.leader.Participant; import javax.annotation.Nullable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -66,13 +65,21 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector this.curator = curator; this.self = self; this.latchPath = latchPath; + + // Creating a LeaderLatch here allows us to query for the current leader. We will not be considered for leadership + // election until LeaderLatch.start() is called in registerListener(). This allows clients to observe the current + // leader without being involved in the election. + this.leaderLatch.set(createNewLeaderLatch()); } private LeaderLatch createNewLeaderLatch() { - final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, latchPath, self.getServiceScheme() + "://" + self.getHostAndPortToUse() - ); + return new LeaderLatch(curator, latchPath, self.getServiceScheme() + "://" + self.getHostAndPortToUse()); + } + + private LeaderLatch createNewLeaderLatchWithListener() + { + final LeaderLatch newLeaderLatch = createNewLeaderLatch(); newLeaderLatch.addListener( new LeaderLatchListener() @@ -94,7 +101,7 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); // give others a chance to become leader. - final LeaderLatch oldLatch = createNewLeaderLatch(); + final LeaderLatch oldLatch = createNewLeaderLatchWithListener(); CloseQuietly.close(oldLatch); leader = false; try { @@ -138,10 +145,6 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector @Override public String getCurrentLeader() { - if (!lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { - throw new ISE("not started"); - } - try { final LeaderLatch latch = leaderLatch.get(); @@ -181,7 +184,7 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector this.listener = listener; this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath)); - createNewLeaderLatch(); + createNewLeaderLatchWithListener(); leaderLatch.get().start(); lifecycleLock.started(); diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java index 02d63dd32a8..69c6c083e2f 100644 --- a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java +++ b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java @@ -26,7 +26,7 @@ import javax.annotation.Nullable; * which expect appropriate implementation available in guice annotated with @IndexingService and @Coordinator * respectively. * - * Usage is as follow. + * Usage is as follows: * On lifecycle start: * druidLeaderSelector.registerListener(myListener); * @@ -35,7 +35,6 @@ import javax.annotation.Nullable; */ public interface DruidLeaderSelector { - /** * Get ID of current Leader. Returns NULL if it can't find the leader. * Note that it is possible for leadership to change right after this call returns, so caller would get wrong @@ -73,14 +72,14 @@ public interface DruidLeaderSelector interface Listener { /** - * Notification that this node should start activities to be done by the leader. if this method throws - * exception then implementation would try to resign its leadership in the underlying system such as curator. + * Notification that this node should start activities to be done by the leader. If this method throws an + * exception, the implementation should resign leadership in the underlying system such as curator. */ void becomeLeader(); /** - * Notification that shid node should stop acitivities which are done by the leader. If this method throws - * exception then an alert is created. + * Notification that this node should stop activities which are done by the leader. If this method throws + * an exception, an alert should be created. */ void stopBeingLeader(); } diff --git a/server/src/main/java/io/druid/server/AsyncManagementForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncManagementForwardingServlet.java new file mode 100644 index 00000000000..21d952c7449 --- /dev/null +++ b/server/src/main/java/io/druid/server/AsyncManagementForwardingServlet.java @@ -0,0 +1,183 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.druid.client.coordinator.Coordinator; +import io.druid.client.indexing.IndexingService; +import io.druid.discovery.DruidLeaderSelector; +import io.druid.guice.annotations.Global; +import io.druid.guice.annotations.Json; +import io.druid.guice.http.DruidHttpClientConfig; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.server.security.AuthConfig; +import org.apache.http.client.utils.URIBuilder; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.proxy.AsyncProxyServlet; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; + +public class AsyncManagementForwardingServlet extends AsyncProxyServlet +{ + private static final EmittingLogger log = new EmittingLogger(AsyncManagementForwardingServlet.class); + + private static final String BASE_URI_ATTRIBUTE = "io.druid.proxy.to.base.uri"; + private static final String MODIFIED_PATH_ATTRIBUTE = "io.druid.proxy.to.path"; + + // These are the typical path conventions for the coordinator and overlord APIs. If we see one of these paths, we will + // forward the request with the path unmodified, e.g.: + // Client Request: https://{ROUTER_HOST}:9088/druid/coordinator/v1/loadstatus?full + // Proxy Request: https://{COORDINATOR_HOST}:8281/druid/coordinator/v1/loadstatus?full + private static final String STANDARD_COORDINATOR_BASE_PATH = "/druid/coordinator"; + private static final String STANDARD_OVERLORD_BASE_PATH = "/druid/indexer"; + + // But there are some cases where the path is either ambiguous or collides with other servlet pathSpecs and where it + // is desirable to explicitly state the destination host. In these cases, we will forward the request with the proxy + // destination component of the path stripped, e.g.: + // Client Request: https://{ROUTER_HOST}:9088/proxy/coordinator/druid-ext/basic-security/authorization/db/b/users + // Proxy Request: https://{COORDINATOR_HOST}:8281/druid-ext/basic-security/authorization/db/b/users + private static final String ARBITRARY_COORDINATOR_BASE_PATH = "/proxy/coordinator"; + private static final String ARBITRARY_OVERLORD_BASE_PATH = "/proxy/overlord"; + + private final ObjectMapper jsonMapper; + private final Provider httpClientProvider; + private final DruidHttpClientConfig httpClientConfig; + private final DruidLeaderSelector coordLeaderSelector; + private final DruidLeaderSelector overlordLeaderSelector; + + @Inject + public AsyncManagementForwardingServlet( + @Json ObjectMapper jsonMapper, + @Global Provider httpClientProvider, + @Global DruidHttpClientConfig httpClientConfig, + @Coordinator DruidLeaderSelector coordLeaderSelector, + @IndexingService DruidLeaderSelector overlordLeaderSelector + ) + { + this.jsonMapper = jsonMapper; + this.httpClientProvider = httpClientProvider; + this.httpClientConfig = httpClientConfig; + this.coordLeaderSelector = coordLeaderSelector; + this.overlordLeaderSelector = overlordLeaderSelector; + } + + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + String currentLeader; + String requestURI = StringUtils.toLowerCase(request.getRequestURI()); + if (requestURI.startsWith(STANDARD_COORDINATOR_BASE_PATH)) { + currentLeader = coordLeaderSelector.getCurrentLeader(); + } else if (requestURI.startsWith(STANDARD_OVERLORD_BASE_PATH)) { + currentLeader = overlordLeaderSelector.getCurrentLeader(); + } else if (requestURI.startsWith(ARBITRARY_COORDINATOR_BASE_PATH)) { + currentLeader = coordLeaderSelector.getCurrentLeader(); + request.setAttribute( + MODIFIED_PATH_ATTRIBUTE, request.getRequestURI().substring(ARBITRARY_COORDINATOR_BASE_PATH.length()) + ); + } else if (requestURI.startsWith(ARBITRARY_OVERLORD_BASE_PATH)) { + currentLeader = overlordLeaderSelector.getCurrentLeader(); + request.setAttribute( + MODIFIED_PATH_ATTRIBUTE, request.getRequestURI().substring(ARBITRARY_OVERLORD_BASE_PATH.length()) + ); + } else { + handleBadRequest(response, StringUtils.format("Unsupported proxy destination [%s]", request.getRequestURI())); + return; + } + + if (currentLeader == null) { + handleBadRequest( + response, + StringUtils.format( + "Unable to determine destination for [%s]; is your coordinator/overlord running?", request.getRequestURI() + ) + ); + return; + } + + request.setAttribute(BASE_URI_ATTRIBUTE, currentLeader); + super.service(request, response); + } + + @Override + protected void sendProxyRequest( + HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest + ) + { + proxyRequest.timeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); + proxyRequest.idleTimeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); + + clientRequest.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); // auth is handled on the destination host + + super.sendProxyRequest(clientRequest, proxyResponse, proxyRequest); + } + + @Override + protected String rewriteTarget(HttpServletRequest request) + { + try { + return new URIBuilder((String) request.getAttribute(BASE_URI_ATTRIBUTE)) + .setPath(request.getAttribute(MODIFIED_PATH_ATTRIBUTE) != null ? + (String) request.getAttribute(MODIFIED_PATH_ATTRIBUTE) : request.getRequestURI()) + .setQuery(request.getQueryString()) // No need to encode-decode queryString, it is already encoded + .build() + .toString(); + } + catch (URISyntaxException e) { + log.error(e, "Unable to rewrite URI [%s]", e.getMessage()); + throw Throwables.propagate(e); + } + } + + @Override + protected HttpClient newHttpClient() + { + return httpClientProvider.get(); + } + + @Override + protected HttpClient createHttpClient() throws ServletException + { + HttpClient client = super.createHttpClient(); + setTimeout(httpClientConfig.getReadTimeout().getMillis()); // override timeout set in ProxyServlet.createHttpClient + return client; + } + + private void handleBadRequest(HttpServletResponse response, String errorMessage) throws IOException + { + if (!response.isCommitted()) { + response.resetBuffer(); + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + jsonMapper.writeValue(response.getOutputStream(), ImmutableMap.of("error", errorMessage)); + } + response.flushBuffer(); + } +} diff --git a/server/src/main/java/io/druid/server/router/ManagementProxyConfig.java b/server/src/main/java/io/druid/server/router/ManagementProxyConfig.java new file mode 100644 index 00000000000..f43c026ac6b --- /dev/null +++ b/server/src/main/java/io/druid/server/router/ManagementProxyConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ManagementProxyConfig +{ + @JsonProperty + private boolean enabled = false; + + public boolean isEnabled() + { + return enabled; + } +} diff --git a/server/src/test/java/io/druid/server/AsyncManagementForwardingServletTest.java b/server/src/test/java/io/druid/server/AsyncManagementForwardingServletTest.java new file mode 100644 index 00000000000..a47a2a2c39c --- /dev/null +++ b/server/src/test/java/io/druid/server/AsyncManagementForwardingServletTest.java @@ -0,0 +1,472 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Injector; +import com.google.inject.Key; +import io.druid.common.utils.SocketUtil; +import io.druid.discovery.DruidLeaderSelector; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; +import io.druid.guice.annotations.Self; +import io.druid.guice.http.DruidHttpClientConfig; +import io.druid.initialization.Initialization; +import io.druid.java.util.common.StringUtils; +import io.druid.server.initialization.BaseJettyTest; +import io.druid.server.initialization.jetty.JettyServerInitUtils; +import io.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.commons.codec.Charsets; +import org.apache.commons.io.IOUtils; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Map; + +public class AsyncManagementForwardingServletTest extends BaseJettyTest +{ + private static final ExpectedRequest coordinatorExpectedRequest = new ExpectedRequest(); + private static final ExpectedRequest overlordExpectedRequest = new ExpectedRequest(); + + private static int coordinatorPort; + private static int overlordPort; + + private Server coordinator; + private Server overlord; + + private static class ExpectedRequest + { + private boolean called = false; + private String path; + private String query; + private String method; + private Map headers; + private String body; + + private void reset() + { + called = false; + path = null; + query = null; + method = null; + headers = null; + body = null; + } + } + + @Override + @Before + public void setup() throws Exception + { + super.setup(); + + coordinatorPort = SocketUtil.findOpenPortFrom(port + 1); + overlordPort = SocketUtil.findOpenPortFrom(coordinatorPort + 1); + + coordinator = makeTestServer(coordinatorPort, coordinatorExpectedRequest); + overlord = makeTestServer(overlordPort, overlordExpectedRequest); + + coordinator.start(); + overlord.start(); + } + + @After + public void tearDown() throws Exception + { + coordinator.stop(); + overlord.stop(); + + coordinatorExpectedRequest.reset(); + overlordExpectedRequest.reset(); + } + + @Override + protected Injector setupInjector() + { + return Initialization.makeInjectorWithModules(GuiceInjectors.makeStartupInjector(), ImmutableList.of((binder) -> { + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null, null, true, false) + ); + binder.bind(JettyServerInitializer.class).to(ProxyJettyServerInit.class).in(LazySingleton.class); + LifecycleModule.register(binder, Server.class); + })); + } + + @Test + public void testCoordinatorDatasources() throws Exception + { + coordinatorExpectedRequest.path = "/druid/coordinator/v1/datasources"; + coordinatorExpectedRequest.method = "GET"; + coordinatorExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ="); + + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d%s", port, coordinatorExpectedRequest.path)) + .openConnection()); + connection.setRequestMethod(coordinatorExpectedRequest.method); + + coordinatorExpectedRequest.headers.forEach(connection::setRequestProperty); + + Assert.assertEquals(200, connection.getResponseCode()); + Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called); + Assert.assertFalse("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testCoordinatorLoadStatus() throws Exception + { + coordinatorExpectedRequest.path = "/druid/coordinator/v1/loadstatus"; + coordinatorExpectedRequest.query = "full"; + coordinatorExpectedRequest.method = "GET"; + coordinatorExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ="); + + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format( + "http://localhost:%d%s?%s", port, coordinatorExpectedRequest.path, coordinatorExpectedRequest.query + )).openConnection()); + connection.setRequestMethod(coordinatorExpectedRequest.method); + + coordinatorExpectedRequest.headers.forEach(connection::setRequestProperty); + + Assert.assertEquals(200, connection.getResponseCode()); + Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called); + Assert.assertFalse("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testCoordinatorEnable() throws Exception + { + coordinatorExpectedRequest.path = "/druid/coordinator/v1/datasources/myDatasource"; + coordinatorExpectedRequest.method = "POST"; + + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d%s", port, coordinatorExpectedRequest.path)) + .openConnection()); + connection.setRequestMethod(coordinatorExpectedRequest.method); + + Assert.assertEquals(200, connection.getResponseCode()); + Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called); + Assert.assertFalse("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testCoordinatorDisable() throws Exception + { + coordinatorExpectedRequest.path = "/druid/coordinator/v1/datasources/myDatasource/intervals/2016-06-27_2016-06-28"; + coordinatorExpectedRequest.method = "DELETE"; + + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d%s", port, coordinatorExpectedRequest.path)) + .openConnection()); + connection.setRequestMethod(coordinatorExpectedRequest.method); + + Assert.assertEquals(200, connection.getResponseCode()); + Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called); + Assert.assertFalse("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testCoordinatorProxyStatus() throws Exception + { + coordinatorExpectedRequest.path = "/status"; + coordinatorExpectedRequest.method = "GET"; + coordinatorExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ="); + + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d/proxy/coordinator%s", port, coordinatorExpectedRequest.path)) + .openConnection()); + connection.setRequestMethod(coordinatorExpectedRequest.method); + + coordinatorExpectedRequest.headers.forEach(connection::setRequestProperty); + + Assert.assertEquals(200, connection.getResponseCode()); + Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called); + Assert.assertFalse("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testCoordinatorProxySegments() throws Exception + { + coordinatorExpectedRequest.path = "/druid/coordinator/v1/metadata/datasources/myDatasource/segments"; + coordinatorExpectedRequest.method = "POST"; + coordinatorExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ="); + coordinatorExpectedRequest.body = "[\"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\", \"2012-01-05T00:00:00.000/2012-01-07T00:00:00.000\"]"; + + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d/proxy/coordinator%s", port, coordinatorExpectedRequest.path)) + .openConnection()); + connection.setRequestMethod(coordinatorExpectedRequest.method); + + coordinatorExpectedRequest.headers.forEach(connection::setRequestProperty); + + connection.setDoOutput(true); + OutputStream os = connection.getOutputStream(); + os.write(coordinatorExpectedRequest.body.getBytes(Charsets.UTF_8)); + os.close(); + + Assert.assertEquals(200, connection.getResponseCode()); + Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called); + Assert.assertFalse("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testOverlordPostTask() throws Exception + { + overlordExpectedRequest.path = "/druid/indexer/v1/task"; + overlordExpectedRequest.method = "POST"; + overlordExpectedRequest.headers = ImmutableMap.of( + "Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ=", + "Content-Type", "application/json" + ); + overlordExpectedRequest.body = "{\"type\": \"index\", \"spec\": \"stuffGoesHere\"}"; + + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d%s", port, overlordExpectedRequest.path)) + .openConnection()); + connection.setRequestMethod(overlordExpectedRequest.method); + + overlordExpectedRequest.headers.forEach(connection::setRequestProperty); + + connection.setDoOutput(true); + OutputStream os = connection.getOutputStream(); + os.write(overlordExpectedRequest.body.getBytes(Charsets.UTF_8)); + os.close(); + + Assert.assertEquals(200, connection.getResponseCode()); + Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called); + Assert.assertTrue("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testOverlordTaskStatus() throws Exception + { + overlordExpectedRequest.path = "/druid/indexer/v1/task/myTaskId/status"; + overlordExpectedRequest.method = "GET"; + overlordExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ="); + + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d%s", port, overlordExpectedRequest.path)) + .openConnection()); + connection.setRequestMethod(overlordExpectedRequest.method); + + overlordExpectedRequest.headers.forEach(connection::setRequestProperty); + + Assert.assertEquals(200, connection.getResponseCode()); + Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called); + Assert.assertTrue("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testOverlordProxyLeader() throws Exception + { + overlordExpectedRequest.path = "/druid/indexer/v1/leader"; + overlordExpectedRequest.method = "GET"; + overlordExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ="); + + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d/proxy/overlord/%s", port, overlordExpectedRequest.path)) + .openConnection()); + connection.setRequestMethod(overlordExpectedRequest.method); + + overlordExpectedRequest.headers.forEach(connection::setRequestProperty); + + Assert.assertEquals(200, connection.getResponseCode()); + Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called); + Assert.assertTrue("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testBadProxyDestination() throws Exception + { + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d/proxy/other/status", port)).openConnection()); + connection.setRequestMethod("GET"); + + Assert.assertEquals(400, connection.getResponseCode()); + Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called); + Assert.assertFalse("overlord called", overlordExpectedRequest.called); + } + + @Test + public void testLocalRequest() throws Exception + { + HttpURLConnection connection = ((HttpURLConnection) + new URL(StringUtils.format("http://localhost:%d/status", port)).openConnection()); + connection.setRequestMethod("GET"); + + Assert.assertEquals(404, connection.getResponseCode()); + Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called); + Assert.assertFalse("overlord called", overlordExpectedRequest.called); + } + + private static Server makeTestServer(int port, ExpectedRequest expectedRequest) + { + Server server = new Server(port); + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(new ServletHolder(new HttpServlet() + { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + { + handle(req, resp); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + { + handle(req, resp); + } + + @Override + protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + { + handle(req, resp); + } + + @Override + protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + { + handle(req, resp); + } + + private void handle(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + boolean passed = expectedRequest.path.equals(req.getRequestURI()); + passed &= expectedRequest.query == null || expectedRequest.query.equals(req.getQueryString()); + passed &= expectedRequest.method.equals(req.getMethod()); + + if (expectedRequest.headers != null) { + for (Map.Entry header : expectedRequest.headers.entrySet()) { + passed &= header.getValue().equals(req.getHeader(header.getKey())); + } + } + + passed &= expectedRequest.body == null || expectedRequest.body.equals(IOUtils.toString(req.getReader())); + + expectedRequest.called = true; + resp.setStatus(passed ? 200 : 400); + } + }), "/*"); + + server.setHandler(handler); + return server; + } + + public static class ProxyJettyServerInit implements JettyServerInitializer + { + @Override + public void initialize(Server server, Injector injector) + { + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + + final DruidLeaderSelector coordinatorLeaderSelector = new TestDruidLeaderSelector() + { + @Override + public String getCurrentLeader() + { + return StringUtils.format("http://localhost:%d", coordinatorPort); + } + }; + + final DruidLeaderSelector overlordLeaderSelector = new TestDruidLeaderSelector() + { + @Override + public String getCurrentLeader() + { + return StringUtils.format("http://localhost:%d", overlordPort); + } + }; + + ServletHolder holder = new ServletHolder( + new AsyncManagementForwardingServlet( + injector.getInstance(ObjectMapper.class), + injector.getProvider(HttpClient.class), + injector.getInstance(DruidHttpClientConfig.class), + coordinatorLeaderSelector, + overlordLeaderSelector + ) + ); + + //NOTE: explicit maxThreads to workaround https://tickets.puppetlabs.com/browse/TK-152 + holder.setInitParameter("maxThreads", "256"); + + root.addServlet(holder, "/druid/coordinator/*"); + root.addServlet(holder, "/druid/indexer/*"); + root.addServlet(holder, "/proxy/*"); + + JettyServerInitUtils.addExtensionFilters(root, injector); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{JettyServerInitUtils.wrapWithDefaultGzipHandler(root)}); + server.setHandler(handlerList); + } + } + + private static class TestDruidLeaderSelector implements DruidLeaderSelector + { + @Nullable + @Override + public String getCurrentLeader() + { + return null; + } + + @Override + public boolean isLeader() + { + return false; + } + + @Override + public int localTerm() + { + return 0; + } + + @Override + public void registerListener(Listener listener) {} + + @Override + public void unregisterListener() {} + } +} diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index b480c13125f..e16b1c798cd 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -26,7 +26,6 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.TypeLiteral; import com.google.inject.name.Names; -import io.druid.java.util.http.client.HttpClient; import io.airlift.airline.Command; import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.ServerDiscoveryFactory; @@ -45,6 +44,7 @@ import io.druid.guice.annotations.EscalatedGlobal; import io.druid.guice.annotations.Self; import io.druid.guice.http.JettyHttpClientModule; import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.http.client.HttpClient; import io.druid.query.lookup.LookupModule; import io.druid.server.AsyncQueryForwardingServlet; import io.druid.server.http.RouterResource; @@ -52,6 +52,7 @@ import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.metrics.QueryCountStatsProvider; import io.druid.server.router.AvaticaConnectionBalancer; import io.druid.server.router.CoordinatorRuleManager; +import io.druid.server.router.ManagementProxyConfig; import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; import io.druid.server.router.TieredBrokerConfig; @@ -85,6 +86,7 @@ public class CliRouter extends ServerRunnable new QueryableModule(), new QueryRunnerFactoryModule(), new JettyHttpClientModule("druid.router.http", Router.class), + JettyHttpClientModule.global(), new Module() { @Override @@ -96,6 +98,7 @@ public class CliRouter extends ServerRunnable JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class); JsonConfigProvider.bind(binder, "druid.router.avatica.balancer", AvaticaConnectionBalancer.class); + JsonConfigProvider.bind(binder, "druid.router.managementProxy", ManagementProxyConfig.class); binder.bind(CoordinatorRuleManager.class); LifecycleModule.register(binder, CoordinatorRuleManager.class); diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java index fc00ca644cb..8e2e639da1d 100644 --- a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java @@ -25,14 +25,15 @@ import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.servlet.GuiceFilter; +import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; import io.druid.guice.http.DruidHttpClientConfig; -import io.druid.java.util.common.logger.Logger; +import io.druid.server.AsyncManagementForwardingServlet; import io.druid.server.AsyncQueryForwardingServlet; import io.druid.server.initialization.jetty.JettyServerInitUtils; import io.druid.server.initialization.jetty.JettyServerInitializer; +import io.druid.server.router.ManagementProxyConfig; import io.druid.server.router.Router; -import io.druid.server.security.AuthConfig; import io.druid.server.security.AuthenticationUtils; import io.druid.server.security.Authenticator; import io.druid.server.security.AuthenticatorMapper; @@ -43,29 +44,35 @@ import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import javax.servlet.Servlet; import java.util.List; -/** - */ public class RouterJettyServerInitializer implements JettyServerInitializer { - private static Logger log = new Logger(RouterJettyServerInitializer.class); - private static List UNSECURED_PATHS = Lists.newArrayList( "/status/health" ); + private final DruidHttpClientConfig routerHttpClientConfig; + private final DruidHttpClientConfig globalHttpClientConfig; + private final ManagementProxyConfig managementProxyConfig; private final AsyncQueryForwardingServlet asyncQueryForwardingServlet; - private final DruidHttpClientConfig httpClientConfig; + private final AsyncManagementForwardingServlet asyncManagementForwardingServlet; @Inject public RouterJettyServerInitializer( - @Router DruidHttpClientConfig httpClientConfig, - AsyncQueryForwardingServlet asyncQueryForwardingServlet + @Router DruidHttpClientConfig routerHttpClientConfig, + @Global DruidHttpClientConfig globalHttpClientConfig, + ManagementProxyConfig managementProxyConfig, + AsyncQueryForwardingServlet asyncQueryForwardingServlet, + AsyncManagementForwardingServlet asyncManagementForwardingServlet ) { - this.httpClientConfig = httpClientConfig; + this.routerHttpClientConfig = routerHttpClientConfig; + this.globalHttpClientConfig = globalHttpClientConfig; + this.managementProxyConfig = managementProxyConfig; this.asyncQueryForwardingServlet = asyncQueryForwardingServlet; + this.asyncManagementForwardingServlet = asyncManagementForwardingServlet; } @Override @@ -75,29 +82,26 @@ public class RouterJettyServerInitializer implements JettyServerInitializer root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - asyncQueryForwardingServlet.setTimeout(httpClientConfig.getReadTimeout().getMillis()); - ServletHolder sh = new ServletHolder(asyncQueryForwardingServlet); - //NOTE: explicit maxThreads to workaround https://tickets.puppetlabs.com/browse/TK-152 - sh.setInitParameter("maxThreads", Integer.toString(httpClientConfig.getNumMaxThreads())); + root.addServlet(buildServletHolder(asyncQueryForwardingServlet, routerHttpClientConfig), "/druid/v2/*"); - //Needs to be set in servlet config or else overridden to default value in AbstractProxyServlet.createHttpClient() - sh.setInitParameter("maxConnections", Integer.toString(httpClientConfig.getNumConnections())); - sh.setInitParameter("idleTimeout", Long.toString(httpClientConfig.getReadTimeout().getMillis())); - sh.setInitParameter("timeout", Long.toString(httpClientConfig.getReadTimeout().getMillis())); + if (managementProxyConfig.isEnabled()) { + ServletHolder managementForwardingServletHolder = buildServletHolder( + asyncManagementForwardingServlet, globalHttpClientConfig + ); + root.addServlet(managementForwardingServletHolder, "/druid/coordinator/*"); + root.addServlet(managementForwardingServletHolder, "/druid/indexer/*"); + root.addServlet(managementForwardingServletHolder, "/proxy/*"); + } - root.addServlet(sh, "/druid/v2/*"); - - final AuthConfig authConfig = injector.getInstance(AuthConfig.class); final ObjectMapper jsonMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); final AuthenticatorMapper authenticatorMapper = injector.getInstance(AuthenticatorMapper.class); - List authenticators = null; AuthenticationUtils.addSecuritySanityCheckFilter(root, jsonMapper); // perform no-op authorization for these resources AuthenticationUtils.addNoopAuthorizationFilters(root, UNSECURED_PATHS); - authenticators = authenticatorMapper.getAuthenticatorChain(); + final List authenticators = authenticatorMapper.getAuthenticatorChain(); AuthenticationUtils.addAuthenticationFilterChain(root, authenticators); JettyServerInitUtils.addExtensionFilters(root, injector); @@ -123,4 +127,19 @@ public class RouterJettyServerInitializer implements JettyServerInitializer ); server.setHandler(handlerList); } + + private ServletHolder buildServletHolder(Servlet servlet, DruidHttpClientConfig httpClientConfig) + { + ServletHolder sh = new ServletHolder(servlet); + + //NOTE: explicit maxThreads to workaround https://tickets.puppetlabs.com/browse/TK-152 + sh.setInitParameter("maxThreads", Integer.toString(httpClientConfig.getNumMaxThreads())); + + //Needs to be set in servlet config or else overridden to default value in AbstractProxyServlet.createHttpClient() + sh.setInitParameter("maxConnections", Integer.toString(httpClientConfig.getNumConnections())); + sh.setInitParameter("idleTimeout", Long.toString(httpClientConfig.getReadTimeout().getMillis())); + sh.setInitParameter("timeout", Long.toString(httpClientConfig.getReadTimeout().getMillis())); + + return sh; + } }