URL encode datasources, task ids, authenticator names. (#5938)

* URL encode datasources, task ids, authenticator names.

* Fix URL encoding for router forwarding servlets.

* Fix log-with-offset API.

* Fix test.

* Test adjustments.

* Task client fixes.

* Remove unused import.
This commit is contained in:
Gian Merlino 2018-09-30 12:29:51 -07:00 committed by Fangjin Yang
parent 3548396a45
commit 9fa4afdb8e
23 changed files with 300 additions and 166 deletions

View File

@ -184,7 +184,7 @@ public class CommonCacheNotifier
druidNode.getServiceScheme(),
druidNode.getHost(),
druidNode.getPortToUse(),
StringUtils.format(baseUrl, itemName)
StringUtils.format(baseUrl, StringUtils.urlEncode(itemName))
);
}
catch (MalformedURLException mue) {

View File

@ -51,7 +51,7 @@ import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
@ -174,12 +174,12 @@ public abstract class IndexTaskClient implements AutoCloseable
protected FullResponseHolder submitRequestWithEmptyContent(
String taskId,
HttpMethod method,
String pathSuffix,
@Nullable String query,
String encodedPathSuffix,
@Nullable String encodedQueryString,
boolean retry
) throws IOException, ChannelException, NoTaskLocationException
{
return submitRequest(taskId, null, method, pathSuffix, query, new byte[0], retry);
return submitRequest(taskId, null, method, encodedPathSuffix, encodedQueryString, new byte[0], retry);
}
/**
@ -188,13 +188,21 @@ public abstract class IndexTaskClient implements AutoCloseable
protected FullResponseHolder submitJsonRequest(
String taskId,
HttpMethod method,
String pathSuffix,
@Nullable String query,
String encodedPathSuffix,
@Nullable String encodedQueryString,
byte[] content,
boolean retry
) throws IOException, ChannelException, NoTaskLocationException
{
return submitRequest(taskId, MediaType.APPLICATION_JSON, method, pathSuffix, query, content, retry);
return submitRequest(
taskId,
MediaType.APPLICATION_JSON,
method,
encodedPathSuffix,
encodedQueryString,
content,
retry
);
}
/**
@ -203,13 +211,21 @@ public abstract class IndexTaskClient implements AutoCloseable
protected FullResponseHolder submitSmileRequest(
String taskId,
HttpMethod method,
String pathSuffix,
@Nullable String query,
String encodedPathSuffix,
@Nullable String encodedQueryString,
byte[] content,
boolean retry
) throws IOException, ChannelException, NoTaskLocationException
{
return submitRequest(taskId, SmileMediaTypes.APPLICATION_JACKSON_SMILE, method, pathSuffix, query, content, retry);
return submitRequest(
taskId,
SmileMediaTypes.APPLICATION_JACKSON_SMILE,
method,
encodedPathSuffix,
encodedQueryString,
content,
retry
);
}
/**
@ -219,8 +235,8 @@ public abstract class IndexTaskClient implements AutoCloseable
String taskId,
@Nullable String mediaType, // nullable if content is empty
HttpMethod method,
String pathSuffix,
@Nullable String query,
String encodedPathSuffix,
@Nullable String encodedQueryString,
byte[] content,
boolean retry
) throws IOException, ChannelException, NoTaskLocationException
@ -231,7 +247,7 @@ public abstract class IndexTaskClient implements AutoCloseable
FullResponseHolder response = null;
Request request = null;
TaskLocation location = TaskLocation.unknown();
String path = StringUtils.format("%s/%s/%s", BASE_PATH, taskId, pathSuffix);
String path = StringUtils.format("%s/%s/%s", BASE_PATH, StringUtils.urlEncode(taskId), encodedPathSuffix);
Optional<TaskStatus> status = taskInfoProvider.getTaskStatus(taskId);
if (!status.isPresent() || !status.get().isRunnable()) {
@ -260,16 +276,14 @@ public abstract class IndexTaskClient implements AutoCloseable
checkConnection(host, port);
try {
URI serviceUri = new URI(
// Use URL constructor, not URI, since the path is already encoded.
final URL serviceUrl = new URL(
scheme,
null,
host,
port,
path,
query,
null
encodedQueryString == null ? path : StringUtils.format("%s?%s", path, encodedQueryString)
);
request = new Request(method, serviceUri.toURL());
request = new Request(method, serviceUrl);
// used to validate that we are talking to the correct worker
request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
@ -278,7 +292,7 @@ public abstract class IndexTaskClient implements AutoCloseable
request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content);
}
log.debug("HTTP %s: %s", method.getName(), serviceUri.toString());
log.debug("HTTP %s: %s", method.getName(), serviceUrl.toString());
response = httpClient.go(request, new FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
}
catch (IOException | ChannelException ioce) {

View File

@ -42,6 +42,12 @@ import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.cache.PathChildrenCacheFactory;
@ -75,12 +81,6 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
@ -91,7 +91,6 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
@ -560,7 +559,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
URL url = null;
try {
url = makeWorkerURL(zkWorker.getWorker(), StringUtils.format("/task/%s/shutdown", taskId));
url = TaskRunnerUtils.makeWorkerURL(zkWorker.getWorker(), "/druid/worker/v1/task/%s/shutdown", taskId);
final StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, url),
RESPONSE_HANDLER,
@ -598,7 +597,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return Optional.absent();
} else {
// Worker is still running this task
final URL url = makeWorkerURL(zkWorker.getWorker(), StringUtils.format("/task/%s/log?offset=%d", taskId, offset));
final URL url = TaskRunnerUtils.makeWorkerURL(
zkWorker.getWorker(),
"/druid/worker/v1/task/%s/log?offset=%d",
taskId,
offset
);
return Optional.of(
new ByteSource()
{
@ -625,18 +629,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
private URL makeWorkerURL(Worker worker, String path)
{
Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path);
try {
return new URL(StringUtils.format("%s://%s/druid/worker/v1%s", worker.getScheme(), worker.getHost(), path));
}
catch (MalformedURLException e) {
throw Throwables.propagate(e);
}
}
/**
* Adds a task to the pending queue
*/
@ -816,7 +808,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
if (immutableZkWorker != null &&
workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId())
== null) {
== null) {
assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());
}
}

View File

@ -19,11 +19,20 @@
package org.apache.druid.indexing.overlord;
import org.apache.druid.java.util.emitter.EmittingLogger;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.Executor;
public class TaskRunnerUtils
@ -89,4 +98,20 @@ public class TaskRunnerUtils
}
}
}
public static URL makeWorkerURL(Worker worker, String pathFormat, Object... pathParams)
{
Preconditions.checkArgument(pathFormat.startsWith("/"), "path must start with '/': %s", pathFormat);
final String path = StringUtils.format(
pathFormat,
Arrays.stream(pathParams).map(s -> StringUtils.urlEncode(s.toString())).toArray()
);
try {
return new URI(StringUtils.format("%s://%s%s", worker.getScheme(), worker.getHost(), path)).toURL();
}
catch (URISyntaxException | MalformedURLException e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
@ -64,7 +65,6 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@ -75,7 +75,6 @@ import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Period;
@ -858,7 +857,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return Optional.absent();
} else {
// Worker is still running this task
final URL url = WorkerHolder.makeWorkerURL(worker, StringUtils.format("/druid/worker/v1/task/%s/log?offset=%d", taskId, offset));
final URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid/worker/v1/task/%s/log?offset=%d", taskId, offset);
return Optional.of(
new ByteSource()
{

View File

@ -22,12 +22,11 @@ package org.apache.druid.indexing.overlord.hrtr;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
@ -35,7 +34,6 @@ import org.apache.druid.indexing.worker.WorkerHistoryItem;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
@ -47,7 +45,6 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -116,7 +113,7 @@ public class WorkerHolder
smileMapper,
httpClient,
workersSyncExec,
makeWorkerURL(worker, "/"),
TaskRunnerUtils.makeWorkerURL(worker, "/"),
"/druid-internal/v1/worker",
WORKER_SYNC_RESP_TYPE_REF,
config.getSyncRequestTimeout().toStandardDuration().getMillis(),
@ -211,18 +208,6 @@ public class WorkerHolder
this.continuouslyFailedTasksCount.incrementAndGet();
}
public static URL makeWorkerURL(Worker worker, String path)
{
Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path);
try {
return new URL(StringUtils.format("%s://%s%s", worker.getScheme(), worker.getHost(), path));
}
catch (MalformedURLException e) {
throw Throwables.propagate(e);
}
}
public boolean assignTask(Task task)
{
if (disabled.get()) {
@ -234,7 +219,7 @@ public class WorkerHolder
return false;
}
URL url = makeWorkerURL(worker, "/druid-internal/v1/worker/assignTask");
URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid-internal/v1/worker/assignTask");
int numTries = config.getAssignRequestMaxRetries();
try {
@ -282,7 +267,7 @@ public class WorkerHolder
public void shutdownTask(String taskId)
{
URL url = makeWorkerURL(worker, StringUtils.format("/druid/worker/v1/task/%s/shutdown", taskId));
final URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid/worker/v1/task/%s/shutdown", taskId);
try {
RetryUtils.retry(

View File

@ -79,7 +79,7 @@ public class RemoteTaskRunnerTest
jsonMapper = rtrTestUtils.getObjectMapper();
cf = rtrTestUtils.getCuratorFramework();
task = TestTasks.unending("task");
task = TestTasks.unending("task id with spaces");
}
@After
@ -308,7 +308,7 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(workerRunningTask(task.getId()));
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals("task"));
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals(task.getId()));
cf.delete().forPath(joiner.join(statusPath, task.getId()));

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import org.apache.druid.indexing.worker.Worker;
import org.junit.Assert;
import org.junit.Test;
import java.net.URL;
public class TaskRunnerUtilsTest
{
@Test
public void testMakeWorkerURL()
{
final URL url = TaskRunnerUtils.makeWorkerURL(
new Worker("https", "1.2.3.4:8290", "1.2.3.4", 1, "0"),
"/druid/worker/v1/task/%s/log",
"foo bar&"
);
Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo+bar%26/log", url.toString());
Assert.assertEquals("1.2.3.4:8290", url.getAuthority());
Assert.assertEquals("/druid/worker/v1/task/foo+bar%26/log", url.getPath());
}
}

View File

@ -34,6 +34,7 @@ import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.io.Closeable;
import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@ -80,7 +81,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
void doTest()
{
LOG.info("Starting test: ITRealtimeIndexTaskTest");
try {
try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
// the task will run for 3 minutes and then shutdown itself
String task = setShutOffTime(
getTaskAsString(getTaskResource()),
@ -153,9 +154,6 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
unloadAndKillData(INDEX_DATASOURCE);
}
}
String setShutOffTime(String taskAsString, DateTime time)

View File

@ -31,6 +31,7 @@ import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.commons.io.IOUtils;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
@ -53,6 +54,11 @@ public abstract class AbstractIndexerTest
@Inject
protected TestQueryHelper queryHelper;
protected Closeable unloader(final String dataSource)
{
return () -> unloadAndKillData(dataSource);
}
protected void unloadAndKillData(final String dataSource)
{
List<String> intervals = coordinator.getSegmentIntervals(dataSource);
@ -68,7 +74,7 @@ public abstract class AbstractIndexerTest
unloadAndKillData(dataSource, first, last);
}
protected void unloadAndKillData(final String dataSource, String start, String end)
private void unloadAndKillData(final String dataSource, String start, String end)
{
// Wait for any existing index tasks to complete before disabling the datasource otherwise
// realtime tasks can get stuck waiting for handoff. https://github.com/apache/incubator-druid/issues/1729

View File

@ -26,6 +26,7 @@ import org.apache.druid.testing.utils.RetryUtil;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.List;
@Guice(moduleFactory = DruidTestModuleFactory.class)
@ -47,7 +48,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
if (intervalsBeforeCompaction.contains(compactedInterval)) {
throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval);
}
try {
try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
compactData(false);
@ -59,9 +60,6 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
intervalsBeforeCompaction.sort(null);
checkCompactionIntervals(intervalsBeforeCompaction);
}
finally {
unloadAndKillData(INDEX_DATASOURCE);
}
}
@Test
@ -70,7 +68,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
loadData();
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
intervalsBeforeCompaction.sort(null);
try {
try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
compactData(true);
@ -80,9 +78,6 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
checkCompactionIntervals(intervalsBeforeCompaction);
}
finally {
unloadAndKillData(INDEX_DATASOURCE);
}
}
private void loadData() throws Exception

View File

@ -23,6 +23,8 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITIndexerTest extends AbstractITBatchIndexTest
{
@ -35,7 +37,10 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
@Test
public void testIndexData() throws Exception
{
try {
try (
final Closeable indexCloseable = unloader(INDEX_DATASOURCE);
final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE)
) {
doIndexTestTest(
INDEX_DATASOURCE,
INDEX_TASK,
@ -47,9 +52,5 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
INDEX_QUERIES_RESOURCE
);
}
finally {
unloadAndKillData(INDEX_DATASOURCE);
unloadAndKillData(REINDEX_DATASOURCE);
}
}
}

View File

@ -23,6 +23,8 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITParallelIndexTest extends AbstractITBatchIndexTest
{
@ -33,15 +35,12 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest
@Test
public void testIndexData() throws Exception
{
try {
try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
doIndexTestTest(
INDEX_DATASOURCE,
INDEX_TASK,
INDEX_QUERIES_RESOURCE
);
}
finally {
unloadAndKillData(INDEX_DATASOURCE);
}
}
}

View File

@ -20,12 +20,12 @@
package org.apache.druid.tests.indexer;
import com.beust.jcommander.internal.Lists;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
@ -43,6 +43,7 @@ import org.joda.time.DateTime;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
@ -70,10 +71,13 @@ public class ITUnionQueryTest extends AbstractIndexerTest
IntegrationTestingConfig config;
@Test
public void testUnionQuery()
public void testUnionQuery() throws IOException
{
final int numTasks = 3;
final Closer closer = Closer.create();
for (int i = 0; i < numTasks; i++) {
closer.register(unloader(UNION_DATASOURCE + i));
}
try {
// Load 4 datasources with same dimensions
String task = setShutOffTime(
@ -143,16 +147,12 @@ public class ITUnionQueryTest extends AbstractIndexerTest
this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2);
}
catch (Exception e) {
LOG.error(e, "Error while testing");
throw Throwables.propagate(e);
catch (Throwable e) {
throw closer.rethrow(e);
}
finally {
for (int i = 0; i < numTasks; i++) {
unloadAndKillData(UNION_DATASOURCE + i);
}
closer.close();
}
}
private String setShutOffTime(String taskAsString, DateTime time)

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import javax.annotation.Nullable;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@ -161,6 +162,16 @@ public class StringUtils
return s;
}
public static String urlEncode(String s)
{
try {
return URLEncoder.encode(s, "UTF-8");
}
catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
private static String removeChar(String s, char c, int firstOccurranceIndex)
{
StringBuilder sb = new StringBuilder(s.length() - 1);
@ -180,6 +191,7 @@ public class StringUtils
* irrelevant to null handling of the data.
*
* @param string the string to test and possibly return
*
* @return {@code string} itself if it is non-null; {@code ""} if it is null
*/
public static String nullToEmptyNonDruidDataString(@Nullable String string)
@ -195,8 +207,9 @@ public class StringUtils
* irrelevant to null handling of the data.
*
* @param string the string to test and possibly return
*
* @return {@code string} itself if it is nonempty; {@code null} if it is
* empty or null
* empty or null
*/
@Nullable
public static String emptyToNullNonDruidDataString(@Nullable String string)

View File

@ -20,7 +20,6 @@
package org.apache.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Provider;
@ -33,7 +32,6 @@ import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.security.AuthConfig;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
@ -42,7 +40,6 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
public class AsyncManagementForwardingServlet extends AsyncProxyServlet
@ -143,18 +140,15 @@ public class AsyncManagementForwardingServlet extends AsyncProxyServlet
@Override
protected String rewriteTarget(HttpServletRequest request)
{
try {
return new URIBuilder((String) request.getAttribute(BASE_URI_ATTRIBUTE))
.setPath(request.getAttribute(MODIFIED_PATH_ATTRIBUTE) != null ?
(String) request.getAttribute(MODIFIED_PATH_ATTRIBUTE) : request.getRequestURI())
.setQuery(request.getQueryString()) // No need to encode-decode queryString, it is already encoded
.build()
.toString();
}
catch (URISyntaxException e) {
log.error(e, "Unable to rewrite URI [%s]", e.getMessage());
throw Throwables.propagate(e);
}
final String encodedPath = request.getAttribute(MODIFIED_PATH_ATTRIBUTE) != null
? (String) request.getAttribute(MODIFIED_PATH_ATTRIBUTE)
: request.getRequestURI();
return JettyUtils.concatenateForRewrite(
(String) request.getAttribute(BASE_URI_ATTRIBUTE),
encodedPath,
request.getQueryString()
);
}
@Override

View File

@ -22,6 +22,7 @@ package org.apache.druid.server;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
@ -32,6 +33,7 @@ import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -48,7 +50,6 @@ import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authenticator;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@ -64,8 +65,6 @@ import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -365,29 +364,22 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
request,
(String) request.getAttribute(SCHEME_ATTRIBUTE),
(String) request.getAttribute(HOST_ATTRIBUTE)
).toString();
);
}
protected URI rewriteURI(HttpServletRequest request, String scheme, String host)
protected String rewriteURI(HttpServletRequest request, String scheme, String host)
{
return makeURI(scheme, host, request.getRequestURI(), request.getQueryString());
}
protected static URI makeURI(String scheme, String host, String requestURI, String rawQueryString)
@VisibleForTesting
static String makeURI(String scheme, String host, String requestURI, String rawQueryString)
{
try {
return new URIBuilder()
.setScheme(scheme)
.setHost(host)
.setPath(requestURI)
// No need to encode-decode queryString, it is already encoded
.setQuery(rawQueryString)
.build();
}
catch (URISyntaxException e) {
log.error(e, "Unable to rewrite URI [%s]", e.getMessage());
throw Throwables.propagate(e);
}
return JettyUtils.concatenateForRewrite(
StringUtils.format("%s://%s", scheme, host),
requestURI,
rawQueryString
);
}
@Override

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import javax.annotation.Nullable;
public class JettyUtils
{
/**
* Concatenate URI parts, in a way that is useful for proxy servlets.
*
* @param base base part of the uri, like http://example.com (no trailing slash)
* @param encodedPath encoded path, like you would get from HttpServletRequest's getRequestURI
* @param encodedQueryString encoded query string, like you would get from HttpServletRequest's getQueryString
*/
public static String concatenateForRewrite(
final String base,
final String encodedPath,
@Nullable final String encodedQueryString
)
{
// Query string and path are already encoded, no need for anything fancy beyond string concatenation.
final StringBuilder url = new StringBuilder(base).append(encodedPath);
if (encodedQueryString != null) {
url.append("?").append(encodedQueryString);
}
return url.toString();
}
}

View File

@ -19,14 +19,13 @@
package org.apache.druid.server.http;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.JettyUtils;
import org.apache.druid.server.security.AuthConfig;
import com.google.inject.Provider;
import org.eclipse.jetty.client.HttpClient;
@ -36,8 +35,6 @@ import org.eclipse.jetty.proxy.ProxyServlet;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.net.URI;
import java.net.URISyntaxException;
/**
* A Proxy servlet that proxies requests to the overlord.
@ -63,23 +60,16 @@ public class OverlordProxyServlet extends ProxyServlet
@Override
protected String rewriteTarget(HttpServletRequest request)
{
try {
final String overlordLeader = druidLeaderClient.findCurrentLeader();
if (overlordLeader == null) {
throw new ISE("Can't find Overlord leader.");
}
String location = StringUtils.format("%s%s", overlordLeader, request.getRequestURI());
if (request.getQueryString() != null) {
location = StringUtils.format("%s?%s", location, request.getQueryString());
}
return new URI(location).toString();
}
catch (URISyntaxException e) {
throw Throwables.propagate(e);
final String overlordLeader = druidLeaderClient.findCurrentLeader();
if (overlordLeader == null) {
throw new ISE("Can't find Overlord leader.");
}
return JettyUtils.concatenateForRewrite(
overlordLeader,
request.getRequestURI(),
request.getQueryString()
);
}
@Override

View File

@ -302,7 +302,7 @@ public class AsyncManagementForwardingServletTest extends BaseJettyTest
overlordExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ=");
HttpURLConnection connection = ((HttpURLConnection)
new URL(StringUtils.format("http://localhost:%d/proxy/overlord/%s", port, overlordExpectedRequest.path))
new URL(StringUtils.format("http://localhost:%d/proxy/overlord%s", port, overlordExpectedRequest.path))
.openConnection());
connection.setRequestMethod(overlordExpectedRequest.method);

View File

@ -348,13 +348,13 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
)
{
@Override
protected URI rewriteURI(HttpServletRequest request, String scheme, String host)
protected String rewriteURI(HttpServletRequest request, String scheme, String host)
{
String uri = super.rewriteURI(request, scheme, host).toString();
if (uri.contains("/druid/v2")) {
return URI.create(uri.replace("/druid/v2", "/default"));
return URI.create(uri.replace("/druid/v2", "/default")).toString();
}
return URI.create(uri.replace("/proxy", ""));
return URI.create(uri.replace("/proxy", "")).toString();
}
});
//NOTE: explicit maxThreads to workaround https://tickets.puppetlabs.com/browse/TK-152
@ -378,7 +378,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
// test params
Assert.assertEquals(
new URI("http://localhost:1234/some/path?param=1"),
"http://localhost:1234/some/path?param=1",
AsyncQueryForwardingServlet.makeURI("http", "localhost:1234", "/some/path", "param=1")
);
@ -391,20 +391,19 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
HostAndPort.fromParts("2a00:1450:4007:805::1007", 1234).toString(),
"/some/path",
"param=1&param2=%E2%82%AC"
).toASCIIString()
)
);
// test null query
Assert.assertEquals(
new URI("http://localhost/"),
"http://localhost/",
AsyncQueryForwardingServlet.makeURI("http", "localhost", "/", null)
);
// Test reWrite Encoded interval with timezone info
// decoded parameters 1900-01-01T00:00:00.000+01.00 -> 1900-01-01T00:00:00.000+01:00
Assert.assertEquals(
new URI(
"http://localhost:1234/some/path?intervals=1900-01-01T00%3A00%3A00.000%2B01%3A00%2F3000-01-01T00%3A00%3A00.000%2B01%3A00"),
"http://localhost:1234/some/path?intervals=1900-01-01T00%3A00%3A00.000%2B01%3A00%2F3000-01-01T00%3A00%3A00.000%2B01%3A00",
AsyncQueryForwardingServlet.makeURI(
"http",
"localhost:1234",

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import org.junit.Assert;
import org.junit.Test;
public class JettyUtilsTest
{
@Test
public void testConcatenateForRewrite()
{
Assert.assertEquals(
"http://example.com/foo%20bar?q=baz%20qux",
JettyUtils.concatenateForRewrite(
"http://example.com",
"/foo%20bar",
"q=baz%20qux"
)
);
}
}

View File

@ -37,12 +37,14 @@ public class OverlordProxyServletTest
HttpServletRequest request = EasyMock.createMock(HttpServletRequest.class);
EasyMock.expect(request.getQueryString()).andReturn("param1=test&param2=test2").anyTimes();
EasyMock.expect(request.getRequestURI()).andReturn("/druid/overlord/worker").anyTimes();
// %3A is a colon; test to make sure urlencoded paths work right.
EasyMock.expect(request.getRequestURI()).andReturn("/druid/over%3Alord/worker").anyTimes();
EasyMock.replay(druidLeaderClient, request);
URI uri = URI.create(new OverlordProxyServlet(druidLeaderClient, null, null).rewriteTarget(request));
Assert.assertEquals("https://overlord:port/druid/overlord/worker?param1=test&param2=test2", uri.toString());
Assert.assertEquals("https://overlord:port/druid/over%3Alord/worker?param1=test&param2=test2", uri.toString());
}
}