Support for router forwarding requests to active coordinator/overlord (#5369)

* allow router to forward requests to coordinator and overlord

* fix forbidden API

* more forbidden api fixes

* code review changes
This commit is contained in:
David Lim 2018-02-15 15:38:58 -07:00 committed by Jonathan Wei
parent c45fe37611
commit 20a3164180
8 changed files with 791 additions and 41 deletions

View File

@ -182,3 +182,41 @@ Returns the dimensions of the datasource.
* `/druid/v2/datasources/{dataSourceName}/metrics`
Returns the metrics of the datasource.
Router as Management Proxy
--------------------------
The router can be configured to forward requests to the active coordinator or overlord node. This may be useful for
setting up a highly available cluster in situations where the HTTP redirect mechanism of the inactive -> active
coordinator/overlord does not function correctly (servers are behind a load balancer, the hostname used in the redirect
is only resolvable internally, etc.).
### Enabling the Management Proxy
To enable this functionality, set the following in the router's runtime.properties:
```
druid.router.managementProxy.enabled=true
```
### Routing
The management proxy supports implicit and explicit routes. Implicit routes are those where the destination can be
determined from the original request path based on Druid API path conventions. For the coordinator the convention is
`/druid/coordinator/*` and for the overlord the convention is `/druid/indexer/*`. These are convenient because they mean
that using the management proxy does not require modifying the API request other than issuing the request to the router
instead of the coordinator or overlord. Most Druid API requests can be routed implicitly.
Explicit routes are those where the request to the router contains a path prefix indicating which node the request
should be routed to. For the coordinator this prefix is `/proxy/coordinator` and for the overlord it is `/proxy/overlord`.
This is required for API calls with an ambiguous destination. For example, the `/status` API is present on all Druid
nodes, so explicit routing needs to be used to indicate the proxy destination.
This is summarized in the table below:
|Request Route|Destination|Rewritten Route|Example|
|-------------|-----------|---------------|-------|
|`/druid/coordinator/*`|Coordinator|`/druid/coordinator/*`|`router:8888/druid/coordinator/v1/datasources` -> `coordinator:8081/druid/coordinator/v1/datasources`|
|`/druid/indexer/*`|Overlord|`/druid/indexer/*`|`router:8888/druid/indexer/v1/task` -> `overlord:8090/druid/indexer/v1/task`|
|`/proxy/coordinator/*`|Coordinator|`/*`|`router:8888/proxy/coordinator/status` -> `coordinator:8081/status`|
|`/proxy/overlord/*`|Overlord|`/*`|`router:8888/proxy/overlord/druid/indexer/v1/isLeader` -> `overlord:8090/druid/indexer/v1/isLeader`|

View File

@ -21,7 +21,6 @@ package io.druid.curator.discovery;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.concurrent.LifecycleLock;
import io.druid.discovery.DruidLeaderSelector;
import io.druid.guice.annotations.Self;
@ -29,6 +28,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.DruidNode;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
@ -38,7 +38,6 @@ import org.apache.curator.framework.recipes.leader.Participant;
import javax.annotation.Nullable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -66,13 +65,21 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
this.curator = curator;
this.self = self;
this.latchPath = latchPath;
// Creating a LeaderLatch here allows us to query for the current leader. We will not be considered for leadership
// election until LeaderLatch.start() is called in registerListener(). This allows clients to observe the current
// leader without being involved in the election.
this.leaderLatch.set(createNewLeaderLatch());
}
private LeaderLatch createNewLeaderLatch()
{
final LeaderLatch newLeaderLatch = new LeaderLatch(
curator, latchPath, self.getServiceScheme() + "://" + self.getHostAndPortToUse()
);
return new LeaderLatch(curator, latchPath, self.getServiceScheme() + "://" + self.getHostAndPortToUse());
}
private LeaderLatch createNewLeaderLatchWithListener()
{
final LeaderLatch newLeaderLatch = createNewLeaderLatch();
newLeaderLatch.addListener(
new LeaderLatchListener()
@ -94,7 +101,7 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit();
// give others a chance to become leader.
final LeaderLatch oldLatch = createNewLeaderLatch();
final LeaderLatch oldLatch = createNewLeaderLatchWithListener();
CloseQuietly.close(oldLatch);
leader = false;
try {
@ -138,10 +145,6 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
@Override
public String getCurrentLeader()
{
if (!lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
throw new ISE("not started");
}
try {
final LeaderLatch latch = leaderLatch.get();
@ -181,7 +184,7 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
this.listener = listener;
this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath));
createNewLeaderLatch();
createNewLeaderLatchWithListener();
leaderLatch.get().start();
lifecycleLock.started();

View File

@ -26,7 +26,7 @@ import javax.annotation.Nullable;
* which expect appropriate implementation available in guice annotated with @IndexingService and @Coordinator
* respectively.
*
* Usage is as follow.
* Usage is as follows:
* On lifecycle start:
* druidLeaderSelector.registerListener(myListener);
*
@ -35,7 +35,6 @@ import javax.annotation.Nullable;
*/
public interface DruidLeaderSelector
{
/**
* Get ID of current Leader. Returns NULL if it can't find the leader.
* Note that it is possible for leadership to change right after this call returns, so caller would get wrong
@ -73,14 +72,14 @@ public interface DruidLeaderSelector
interface Listener
{
/**
* Notification that this node should start activities to be done by the leader. if this method throws
* exception then implementation would try to resign its leadership in the underlying system such as curator.
* Notification that this node should start activities to be done by the leader. If this method throws an
* exception, the implementation should resign leadership in the underlying system such as curator.
*/
void becomeLeader();
/**
* Notification that shid node should stop acitivities which are done by the leader. If this method throws
* exception then an alert is created.
* Notification that this node should stop activities which are done by the leader. If this method throws
* an exception, an alert should be created.
*/
void stopBeingLeader();
}

View File

@ -0,0 +1,183 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.druid.client.coordinator.Coordinator;
import io.druid.client.indexing.IndexingService;
import io.druid.discovery.DruidLeaderSelector;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.security.AuthConfig;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
public class AsyncManagementForwardingServlet extends AsyncProxyServlet
{
private static final EmittingLogger log = new EmittingLogger(AsyncManagementForwardingServlet.class);
private static final String BASE_URI_ATTRIBUTE = "io.druid.proxy.to.base.uri";
private static final String MODIFIED_PATH_ATTRIBUTE = "io.druid.proxy.to.path";
// These are the typical path conventions for the coordinator and overlord APIs. If we see one of these paths, we will
// forward the request with the path unmodified, e.g.:
// Client Request: https://{ROUTER_HOST}:9088/druid/coordinator/v1/loadstatus?full
// Proxy Request: https://{COORDINATOR_HOST}:8281/druid/coordinator/v1/loadstatus?full
private static final String STANDARD_COORDINATOR_BASE_PATH = "/druid/coordinator";
private static final String STANDARD_OVERLORD_BASE_PATH = "/druid/indexer";
// But there are some cases where the path is either ambiguous or collides with other servlet pathSpecs and where it
// is desirable to explicitly state the destination host. In these cases, we will forward the request with the proxy
// destination component of the path stripped, e.g.:
// Client Request: https://{ROUTER_HOST}:9088/proxy/coordinator/druid-ext/basic-security/authorization/db/b/users
// Proxy Request: https://{COORDINATOR_HOST}:8281/druid-ext/basic-security/authorization/db/b/users
private static final String ARBITRARY_COORDINATOR_BASE_PATH = "/proxy/coordinator";
private static final String ARBITRARY_OVERLORD_BASE_PATH = "/proxy/overlord";
private final ObjectMapper jsonMapper;
private final Provider<HttpClient> httpClientProvider;
private final DruidHttpClientConfig httpClientConfig;
private final DruidLeaderSelector coordLeaderSelector;
private final DruidLeaderSelector overlordLeaderSelector;
@Inject
public AsyncManagementForwardingServlet(
@Json ObjectMapper jsonMapper,
@Global Provider<HttpClient> httpClientProvider,
@Global DruidHttpClientConfig httpClientConfig,
@Coordinator DruidLeaderSelector coordLeaderSelector,
@IndexingService DruidLeaderSelector overlordLeaderSelector
)
{
this.jsonMapper = jsonMapper;
this.httpClientProvider = httpClientProvider;
this.httpClientConfig = httpClientConfig;
this.coordLeaderSelector = coordLeaderSelector;
this.overlordLeaderSelector = overlordLeaderSelector;
}
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
String currentLeader;
String requestURI = StringUtils.toLowerCase(request.getRequestURI());
if (requestURI.startsWith(STANDARD_COORDINATOR_BASE_PATH)) {
currentLeader = coordLeaderSelector.getCurrentLeader();
} else if (requestURI.startsWith(STANDARD_OVERLORD_BASE_PATH)) {
currentLeader = overlordLeaderSelector.getCurrentLeader();
} else if (requestURI.startsWith(ARBITRARY_COORDINATOR_BASE_PATH)) {
currentLeader = coordLeaderSelector.getCurrentLeader();
request.setAttribute(
MODIFIED_PATH_ATTRIBUTE, request.getRequestURI().substring(ARBITRARY_COORDINATOR_BASE_PATH.length())
);
} else if (requestURI.startsWith(ARBITRARY_OVERLORD_BASE_PATH)) {
currentLeader = overlordLeaderSelector.getCurrentLeader();
request.setAttribute(
MODIFIED_PATH_ATTRIBUTE, request.getRequestURI().substring(ARBITRARY_OVERLORD_BASE_PATH.length())
);
} else {
handleBadRequest(response, StringUtils.format("Unsupported proxy destination [%s]", request.getRequestURI()));
return;
}
if (currentLeader == null) {
handleBadRequest(
response,
StringUtils.format(
"Unable to determine destination for [%s]; is your coordinator/overlord running?", request.getRequestURI()
)
);
return;
}
request.setAttribute(BASE_URI_ATTRIBUTE, currentLeader);
super.service(request, response);
}
@Override
protected void sendProxyRequest(
HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest
)
{
proxyRequest.timeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS);
proxyRequest.idleTimeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS);
clientRequest.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); // auth is handled on the destination host
super.sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
}
@Override
protected String rewriteTarget(HttpServletRequest request)
{
try {
return new URIBuilder((String) request.getAttribute(BASE_URI_ATTRIBUTE))
.setPath(request.getAttribute(MODIFIED_PATH_ATTRIBUTE) != null ?
(String) request.getAttribute(MODIFIED_PATH_ATTRIBUTE) : request.getRequestURI())
.setQuery(request.getQueryString()) // No need to encode-decode queryString, it is already encoded
.build()
.toString();
}
catch (URISyntaxException e) {
log.error(e, "Unable to rewrite URI [%s]", e.getMessage());
throw Throwables.propagate(e);
}
}
@Override
protected HttpClient newHttpClient()
{
return httpClientProvider.get();
}
@Override
protected HttpClient createHttpClient() throws ServletException
{
HttpClient client = super.createHttpClient();
setTimeout(httpClientConfig.getReadTimeout().getMillis()); // override timeout set in ProxyServlet.createHttpClient
return client;
}
private void handleBadRequest(HttpServletResponse response, String errorMessage) throws IOException
{
if (!response.isCommitted()) {
response.resetBuffer();
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
jsonMapper.writeValue(response.getOutputStream(), ImmutableMap.of("error", errorMessage));
}
response.flushBuffer();
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.router;
import com.fasterxml.jackson.annotation.JsonProperty;
public class ManagementProxyConfig
{
@JsonProperty
private boolean enabled = false;
public boolean isEnabled()
{
return enabled;
}
}

View File

@ -0,0 +1,472 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.Key;
import io.druid.common.utils.SocketUtil;
import io.druid.discovery.DruidLeaderSelector;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.annotations.Self;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.initialization.Initialization;
import io.druid.java.util.common.StringUtils;
import io.druid.server.initialization.BaseJettyTest;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.commons.codec.Charsets;
import org.apache.commons.io.IOUtils;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
public class AsyncManagementForwardingServletTest extends BaseJettyTest
{
private static final ExpectedRequest coordinatorExpectedRequest = new ExpectedRequest();
private static final ExpectedRequest overlordExpectedRequest = new ExpectedRequest();
private static int coordinatorPort;
private static int overlordPort;
private Server coordinator;
private Server overlord;
private static class ExpectedRequest
{
private boolean called = false;
private String path;
private String query;
private String method;
private Map<String, String> headers;
private String body;
private void reset()
{
called = false;
path = null;
query = null;
method = null;
headers = null;
body = null;
}
}
@Override
@Before
public void setup() throws Exception
{
super.setup();
coordinatorPort = SocketUtil.findOpenPortFrom(port + 1);
overlordPort = SocketUtil.findOpenPortFrom(coordinatorPort + 1);
coordinator = makeTestServer(coordinatorPort, coordinatorExpectedRequest);
overlord = makeTestServer(overlordPort, overlordExpectedRequest);
coordinator.start();
overlord.start();
}
@After
public void tearDown() throws Exception
{
coordinator.stop();
overlord.stop();
coordinatorExpectedRequest.reset();
overlordExpectedRequest.reset();
}
@Override
protected Injector setupInjector()
{
return Initialization.makeInjectorWithModules(GuiceInjectors.makeStartupInjector(), ImmutableList.of((binder) -> {
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null, null, true, false)
);
binder.bind(JettyServerInitializer.class).to(ProxyJettyServerInit.class).in(LazySingleton.class);
LifecycleModule.register(binder, Server.class);
}));
}
@Test
public void testCoordinatorDatasources() throws Exception
{
coordinatorExpectedRequest.path = "/druid/coordinator/v1/datasources";
coordinatorExpectedRequest.method = "GET";
coordinatorExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ=");
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d%s", port, coordinatorExpectedRequest.path))
.openConnection());
connection.setRequestMethod(coordinatorExpectedRequest.method);
coordinatorExpectedRequest.headers.forEach(connection::setRequestProperty);
Assert.assertEquals(200, connection.getResponseCode());
Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called);
Assert.assertFalse("overlord called", overlordExpectedRequest.called);
}
@Test
public void testCoordinatorLoadStatus() throws Exception
{
coordinatorExpectedRequest.path = "/druid/coordinator/v1/loadstatus";
coordinatorExpectedRequest.query = "full";
coordinatorExpectedRequest.method = "GET";
coordinatorExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ=");
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format(
"http://localhost:%d%s?%s", port, coordinatorExpectedRequest.path, coordinatorExpectedRequest.query
)).openConnection());
connection.setRequestMethod(coordinatorExpectedRequest.method);
coordinatorExpectedRequest.headers.forEach(connection::setRequestProperty);
Assert.assertEquals(200, connection.getResponseCode());
Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called);
Assert.assertFalse("overlord called", overlordExpectedRequest.called);
}
@Test
public void testCoordinatorEnable() throws Exception
{
coordinatorExpectedRequest.path = "/druid/coordinator/v1/datasources/myDatasource";
coordinatorExpectedRequest.method = "POST";
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d%s", port, coordinatorExpectedRequest.path))
.openConnection());
connection.setRequestMethod(coordinatorExpectedRequest.method);
Assert.assertEquals(200, connection.getResponseCode());
Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called);
Assert.assertFalse("overlord called", overlordExpectedRequest.called);
}
@Test
public void testCoordinatorDisable() throws Exception
{
coordinatorExpectedRequest.path = "/druid/coordinator/v1/datasources/myDatasource/intervals/2016-06-27_2016-06-28";
coordinatorExpectedRequest.method = "DELETE";
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d%s", port, coordinatorExpectedRequest.path))
.openConnection());
connection.setRequestMethod(coordinatorExpectedRequest.method);
Assert.assertEquals(200, connection.getResponseCode());
Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called);
Assert.assertFalse("overlord called", overlordExpectedRequest.called);
}
@Test
public void testCoordinatorProxyStatus() throws Exception
{
coordinatorExpectedRequest.path = "/status";
coordinatorExpectedRequest.method = "GET";
coordinatorExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ=");
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d/proxy/coordinator%s", port, coordinatorExpectedRequest.path))
.openConnection());
connection.setRequestMethod(coordinatorExpectedRequest.method);
coordinatorExpectedRequest.headers.forEach(connection::setRequestProperty);
Assert.assertEquals(200, connection.getResponseCode());
Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called);
Assert.assertFalse("overlord called", overlordExpectedRequest.called);
}
@Test
public void testCoordinatorProxySegments() throws Exception
{
coordinatorExpectedRequest.path = "/druid/coordinator/v1/metadata/datasources/myDatasource/segments";
coordinatorExpectedRequest.method = "POST";
coordinatorExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ=");
coordinatorExpectedRequest.body = "[\"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\", \"2012-01-05T00:00:00.000/2012-01-07T00:00:00.000\"]";
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d/proxy/coordinator%s", port, coordinatorExpectedRequest.path))
.openConnection());
connection.setRequestMethod(coordinatorExpectedRequest.method);
coordinatorExpectedRequest.headers.forEach(connection::setRequestProperty);
connection.setDoOutput(true);
OutputStream os = connection.getOutputStream();
os.write(coordinatorExpectedRequest.body.getBytes(Charsets.UTF_8));
os.close();
Assert.assertEquals(200, connection.getResponseCode());
Assert.assertTrue("coordinator called", coordinatorExpectedRequest.called);
Assert.assertFalse("overlord called", overlordExpectedRequest.called);
}
@Test
public void testOverlordPostTask() throws Exception
{
overlordExpectedRequest.path = "/druid/indexer/v1/task";
overlordExpectedRequest.method = "POST";
overlordExpectedRequest.headers = ImmutableMap.of(
"Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ=",
"Content-Type", "application/json"
);
overlordExpectedRequest.body = "{\"type\": \"index\", \"spec\": \"stuffGoesHere\"}";
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d%s", port, overlordExpectedRequest.path))
.openConnection());
connection.setRequestMethod(overlordExpectedRequest.method);
overlordExpectedRequest.headers.forEach(connection::setRequestProperty);
connection.setDoOutput(true);
OutputStream os = connection.getOutputStream();
os.write(overlordExpectedRequest.body.getBytes(Charsets.UTF_8));
os.close();
Assert.assertEquals(200, connection.getResponseCode());
Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called);
Assert.assertTrue("overlord called", overlordExpectedRequest.called);
}
@Test
public void testOverlordTaskStatus() throws Exception
{
overlordExpectedRequest.path = "/druid/indexer/v1/task/myTaskId/status";
overlordExpectedRequest.method = "GET";
overlordExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ=");
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d%s", port, overlordExpectedRequest.path))
.openConnection());
connection.setRequestMethod(overlordExpectedRequest.method);
overlordExpectedRequest.headers.forEach(connection::setRequestProperty);
Assert.assertEquals(200, connection.getResponseCode());
Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called);
Assert.assertTrue("overlord called", overlordExpectedRequest.called);
}
@Test
public void testOverlordProxyLeader() throws Exception
{
overlordExpectedRequest.path = "/druid/indexer/v1/leader";
overlordExpectedRequest.method = "GET";
overlordExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ=");
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d/proxy/overlord/%s", port, overlordExpectedRequest.path))
.openConnection());
connection.setRequestMethod(overlordExpectedRequest.method);
overlordExpectedRequest.headers.forEach(connection::setRequestProperty);
Assert.assertEquals(200, connection.getResponseCode());
Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called);
Assert.assertTrue("overlord called", overlordExpectedRequest.called);
}
@Test
public void testBadProxyDestination() throws Exception
{
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d/proxy/other/status", port)).openConnection());
connection.setRequestMethod("GET");
Assert.assertEquals(400, connection.getResponseCode());
Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called);
Assert.assertFalse("overlord called", overlordExpectedRequest.called);
}
@Test
public void testLocalRequest() throws Exception
{
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d/status", port)).openConnection());
connection.setRequestMethod("GET");
Assert.assertEquals(404, connection.getResponseCode());
Assert.assertFalse("coordinator called", coordinatorExpectedRequest.called);
Assert.assertFalse("overlord called", overlordExpectedRequest.called);
}
private static Server makeTestServer(int port, ExpectedRequest expectedRequest)
{
Server server = new Server(port);
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(new ServletHolder(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
handle(req, resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
handle(req, resp);
}
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
handle(req, resp);
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
handle(req, resp);
}
private void handle(HttpServletRequest req, HttpServletResponse resp) throws IOException
{
boolean passed = expectedRequest.path.equals(req.getRequestURI());
passed &= expectedRequest.query == null || expectedRequest.query.equals(req.getQueryString());
passed &= expectedRequest.method.equals(req.getMethod());
if (expectedRequest.headers != null) {
for (Map.Entry<String, String> header : expectedRequest.headers.entrySet()) {
passed &= header.getValue().equals(req.getHeader(header.getKey()));
}
}
passed &= expectedRequest.body == null || expectedRequest.body.equals(IOUtils.toString(req.getReader()));
expectedRequest.called = true;
resp.setStatus(passed ? 200 : 400);
}
}), "/*");
server.setHandler(handler);
return server;
}
public static class ProxyJettyServerInit implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
final DruidLeaderSelector coordinatorLeaderSelector = new TestDruidLeaderSelector()
{
@Override
public String getCurrentLeader()
{
return StringUtils.format("http://localhost:%d", coordinatorPort);
}
};
final DruidLeaderSelector overlordLeaderSelector = new TestDruidLeaderSelector()
{
@Override
public String getCurrentLeader()
{
return StringUtils.format("http://localhost:%d", overlordPort);
}
};
ServletHolder holder = new ServletHolder(
new AsyncManagementForwardingServlet(
injector.getInstance(ObjectMapper.class),
injector.getProvider(HttpClient.class),
injector.getInstance(DruidHttpClientConfig.class),
coordinatorLeaderSelector,
overlordLeaderSelector
)
);
//NOTE: explicit maxThreads to workaround https://tickets.puppetlabs.com/browse/TK-152
holder.setInitParameter("maxThreads", "256");
root.addServlet(holder, "/druid/coordinator/*");
root.addServlet(holder, "/druid/indexer/*");
root.addServlet(holder, "/proxy/*");
JettyServerInitUtils.addExtensionFilters(root, injector);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.wrapWithDefaultGzipHandler(root)});
server.setHandler(handlerList);
}
}
private static class TestDruidLeaderSelector implements DruidLeaderSelector
{
@Nullable
@Override
public String getCurrentLeader()
{
return null;
}
@Override
public boolean isLeader()
{
return false;
}
@Override
public int localTerm()
{
return 0;
}
@Override
public void registerListener(Listener listener) {}
@Override
public void unregisterListener() {}
}
}

View File

@ -26,7 +26,6 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import io.druid.java.util.http.client.HttpClient;
import io.airlift.airline.Command;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.curator.discovery.ServerDiscoveryFactory;
@ -45,6 +44,7 @@ import io.druid.guice.annotations.EscalatedGlobal;
import io.druid.guice.annotations.Self;
import io.druid.guice.http.JettyHttpClientModule;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.http.client.HttpClient;
import io.druid.query.lookup.LookupModule;
import io.druid.server.AsyncQueryForwardingServlet;
import io.druid.server.http.RouterResource;
@ -52,6 +52,7 @@ import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.router.AvaticaConnectionBalancer;
import io.druid.server.router.CoordinatorRuleManager;
import io.druid.server.router.ManagementProxyConfig;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
import io.druid.server.router.TieredBrokerConfig;
@ -85,6 +86,7 @@ public class CliRouter extends ServerRunnable
new QueryableModule(),
new QueryRunnerFactoryModule(),
new JettyHttpClientModule("druid.router.http", Router.class),
JettyHttpClientModule.global(),
new Module()
{
@Override
@ -96,6 +98,7 @@ public class CliRouter extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class);
JsonConfigProvider.bind(binder, "druid.router.avatica.balancer", AvaticaConnectionBalancer.class);
JsonConfigProvider.bind(binder, "druid.router.managementProxy", ManagementProxyConfig.class);
binder.bind(CoordinatorRuleManager.class);
LifecycleModule.register(binder, CoordinatorRuleManager.class);

View File

@ -25,14 +25,15 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.servlet.GuiceFilter;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.AsyncManagementForwardingServlet;
import io.druid.server.AsyncQueryForwardingServlet;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.router.ManagementProxyConfig;
import io.druid.server.router.Router;
import io.druid.server.security.AuthConfig;
import io.druid.server.security.AuthenticationUtils;
import io.druid.server.security.Authenticator;
import io.druid.server.security.AuthenticatorMapper;
@ -43,29 +44,35 @@ import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import javax.servlet.Servlet;
import java.util.List;
/**
*/
public class RouterJettyServerInitializer implements JettyServerInitializer
{
private static Logger log = new Logger(RouterJettyServerInitializer.class);
private static List<String> UNSECURED_PATHS = Lists.newArrayList(
"/status/health"
);
private final DruidHttpClientConfig routerHttpClientConfig;
private final DruidHttpClientConfig globalHttpClientConfig;
private final ManagementProxyConfig managementProxyConfig;
private final AsyncQueryForwardingServlet asyncQueryForwardingServlet;
private final DruidHttpClientConfig httpClientConfig;
private final AsyncManagementForwardingServlet asyncManagementForwardingServlet;
@Inject
public RouterJettyServerInitializer(
@Router DruidHttpClientConfig httpClientConfig,
AsyncQueryForwardingServlet asyncQueryForwardingServlet
@Router DruidHttpClientConfig routerHttpClientConfig,
@Global DruidHttpClientConfig globalHttpClientConfig,
ManagementProxyConfig managementProxyConfig,
AsyncQueryForwardingServlet asyncQueryForwardingServlet,
AsyncManagementForwardingServlet asyncManagementForwardingServlet
)
{
this.httpClientConfig = httpClientConfig;
this.routerHttpClientConfig = routerHttpClientConfig;
this.globalHttpClientConfig = globalHttpClientConfig;
this.managementProxyConfig = managementProxyConfig;
this.asyncQueryForwardingServlet = asyncQueryForwardingServlet;
this.asyncManagementForwardingServlet = asyncManagementForwardingServlet;
}
@Override
@ -75,29 +82,26 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
asyncQueryForwardingServlet.setTimeout(httpClientConfig.getReadTimeout().getMillis());
ServletHolder sh = new ServletHolder(asyncQueryForwardingServlet);
//NOTE: explicit maxThreads to workaround https://tickets.puppetlabs.com/browse/TK-152
sh.setInitParameter("maxThreads", Integer.toString(httpClientConfig.getNumMaxThreads()));
root.addServlet(buildServletHolder(asyncQueryForwardingServlet, routerHttpClientConfig), "/druid/v2/*");
//Needs to be set in servlet config or else overridden to default value in AbstractProxyServlet.createHttpClient()
sh.setInitParameter("maxConnections", Integer.toString(httpClientConfig.getNumConnections()));
sh.setInitParameter("idleTimeout", Long.toString(httpClientConfig.getReadTimeout().getMillis()));
sh.setInitParameter("timeout", Long.toString(httpClientConfig.getReadTimeout().getMillis()));
if (managementProxyConfig.isEnabled()) {
ServletHolder managementForwardingServletHolder = buildServletHolder(
asyncManagementForwardingServlet, globalHttpClientConfig
);
root.addServlet(managementForwardingServletHolder, "/druid/coordinator/*");
root.addServlet(managementForwardingServletHolder, "/druid/indexer/*");
root.addServlet(managementForwardingServletHolder, "/proxy/*");
}
root.addServlet(sh, "/druid/v2/*");
final AuthConfig authConfig = injector.getInstance(AuthConfig.class);
final ObjectMapper jsonMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
final AuthenticatorMapper authenticatorMapper = injector.getInstance(AuthenticatorMapper.class);
List<Authenticator> authenticators = null;
AuthenticationUtils.addSecuritySanityCheckFilter(root, jsonMapper);
// perform no-op authorization for these resources
AuthenticationUtils.addNoopAuthorizationFilters(root, UNSECURED_PATHS);
authenticators = authenticatorMapper.getAuthenticatorChain();
final List<Authenticator> authenticators = authenticatorMapper.getAuthenticatorChain();
AuthenticationUtils.addAuthenticationFilterChain(root, authenticators);
JettyServerInitUtils.addExtensionFilters(root, injector);
@ -123,4 +127,19 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
);
server.setHandler(handlerList);
}
private ServletHolder buildServletHolder(Servlet servlet, DruidHttpClientConfig httpClientConfig)
{
ServletHolder sh = new ServletHolder(servlet);
//NOTE: explicit maxThreads to workaround https://tickets.puppetlabs.com/browse/TK-152
sh.setInitParameter("maxThreads", Integer.toString(httpClientConfig.getNumMaxThreads()));
//Needs to be set in servlet config or else overridden to default value in AbstractProxyServlet.createHttpClient()
sh.setInitParameter("maxConnections", Integer.toString(httpClientConfig.getNumConnections()));
sh.setInitParameter("idleTimeout", Long.toString(httpClientConfig.getReadTimeout().getMillis()));
sh.setInitParameter("timeout", Long.toString(httpClientConfig.getReadTimeout().getMillis()));
return sh;
}
}