diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java index 21a009bb230..33dd021a64d 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java @@ -184,7 +184,7 @@ public class CommonCacheNotifier druidNode.getServiceScheme(), druidNode.getHost(), druidNode.getPortToUse(), - StringUtils.format(baseUrl, itemName) + StringUtils.format(baseUrl, StringUtils.urlEncode(itemName)) ); } catch (MalformedURLException mue) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java index 1ef152b3113..92042a26aa2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java @@ -51,7 +51,7 @@ import javax.annotation.Nullable; import javax.ws.rs.core.MediaType; import java.io.IOException; import java.net.Socket; -import java.net.URI; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.concurrent.Callable; @@ -174,12 +174,12 @@ public abstract class IndexTaskClient implements AutoCloseable protected FullResponseHolder submitRequestWithEmptyContent( String taskId, HttpMethod method, - String pathSuffix, - @Nullable String query, + String encodedPathSuffix, + @Nullable String encodedQueryString, boolean retry ) throws IOException, ChannelException, NoTaskLocationException { - return submitRequest(taskId, null, method, pathSuffix, query, new byte[0], retry); + return submitRequest(taskId, null, method, encodedPathSuffix, encodedQueryString, new byte[0], retry); } /** @@ -188,13 +188,21 @@ public abstract class IndexTaskClient implements AutoCloseable protected FullResponseHolder submitJsonRequest( String taskId, HttpMethod method, - String pathSuffix, - @Nullable String query, + String encodedPathSuffix, + @Nullable String encodedQueryString, byte[] content, boolean retry ) throws IOException, ChannelException, NoTaskLocationException { - return submitRequest(taskId, MediaType.APPLICATION_JSON, method, pathSuffix, query, content, retry); + return submitRequest( + taskId, + MediaType.APPLICATION_JSON, + method, + encodedPathSuffix, + encodedQueryString, + content, + retry + ); } /** @@ -203,13 +211,21 @@ public abstract class IndexTaskClient implements AutoCloseable protected FullResponseHolder submitSmileRequest( String taskId, HttpMethod method, - String pathSuffix, - @Nullable String query, + String encodedPathSuffix, + @Nullable String encodedQueryString, byte[] content, boolean retry ) throws IOException, ChannelException, NoTaskLocationException { - return submitRequest(taskId, SmileMediaTypes.APPLICATION_JACKSON_SMILE, method, pathSuffix, query, content, retry); + return submitRequest( + taskId, + SmileMediaTypes.APPLICATION_JACKSON_SMILE, + method, + encodedPathSuffix, + encodedQueryString, + content, + retry + ); } /** @@ -219,8 +235,8 @@ public abstract class IndexTaskClient implements AutoCloseable String taskId, @Nullable String mediaType, // nullable if content is empty HttpMethod method, - String pathSuffix, - @Nullable String query, + String encodedPathSuffix, + @Nullable String encodedQueryString, byte[] content, boolean retry ) throws IOException, ChannelException, NoTaskLocationException @@ -231,7 +247,7 @@ public abstract class IndexTaskClient implements AutoCloseable FullResponseHolder response = null; Request request = null; TaskLocation location = TaskLocation.unknown(); - String path = StringUtils.format("%s/%s/%s", BASE_PATH, taskId, pathSuffix); + String path = StringUtils.format("%s/%s/%s", BASE_PATH, StringUtils.urlEncode(taskId), encodedPathSuffix); Optional status = taskInfoProvider.getTaskStatus(taskId); if (!status.isPresent() || !status.get().isRunnable()) { @@ -260,16 +276,14 @@ public abstract class IndexTaskClient implements AutoCloseable checkConnection(host, port); try { - URI serviceUri = new URI( + // Use URL constructor, not URI, since the path is already encoded. + final URL serviceUrl = new URL( scheme, - null, host, port, - path, - query, - null + encodedQueryString == null ? path : StringUtils.format("%s?%s", path, encodedQueryString) ); - request = new Request(method, serviceUri.toURL()); + request = new Request(method, serviceUrl); // used to validate that we are talking to the correct worker request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId); @@ -278,7 +292,7 @@ public abstract class IndexTaskClient implements AutoCloseable request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content); } - log.debug("HTTP %s: %s", method.getName(), serviceUri.toString()); + log.debug("HTTP %s: %s", method.getName(), serviceUrl.toString()); response = httpClient.go(request, new FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get(); } catch (IOException | ChannelException ioce) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 736cab6e3b7..16b75ed4c38 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -42,6 +42,12 @@ import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.curator.CuratorUtils; import org.apache.druid.curator.cache.PathChildrenCacheFactory; @@ -75,12 +81,6 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.server.initialization.IndexerZkConfig; import org.apache.druid.tasklogs.TaskLogStreamer; -import org.apache.commons.lang.mutable.MutableInt; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -91,7 +91,6 @@ import org.joda.time.Period; import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; -import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -560,7 +559,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } URL url = null; try { - url = makeWorkerURL(zkWorker.getWorker(), StringUtils.format("/task/%s/shutdown", taskId)); + url = TaskRunnerUtils.makeWorkerURL(zkWorker.getWorker(), "/druid/worker/v1/task/%s/shutdown", taskId); final StatusResponseHolder response = httpClient.go( new Request(HttpMethod.POST, url), RESPONSE_HANDLER, @@ -598,7 +597,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer return Optional.absent(); } else { // Worker is still running this task - final URL url = makeWorkerURL(zkWorker.getWorker(), StringUtils.format("/task/%s/log?offset=%d", taskId, offset)); + final URL url = TaskRunnerUtils.makeWorkerURL( + zkWorker.getWorker(), + "/druid/worker/v1/task/%s/log?offset=%d", + taskId, + offset + ); return Optional.of( new ByteSource() { @@ -625,18 +629,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } } - private URL makeWorkerURL(Worker worker, String path) - { - Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path); - - try { - return new URL(StringUtils.format("%s://%s/druid/worker/v1%s", worker.getScheme(), worker.getHost(), path)); - } - catch (MalformedURLException e) { - throw Throwables.propagate(e); - } - } - /** * Adds a task to the pending queue */ @@ -816,7 +808,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer if (immutableZkWorker != null && workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId()) - == null) { + == null) { assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java index 63a571f97fc..176a3a12996 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java @@ -19,11 +19,20 @@ package org.apache.druid.indexing.overlord; -import org.apache.druid.java.util.emitter.EmittingLogger; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Arrays; import java.util.concurrent.Executor; public class TaskRunnerUtils @@ -89,4 +98,20 @@ public class TaskRunnerUtils } } } + + public static URL makeWorkerURL(Worker worker, String pathFormat, Object... pathParams) + { + Preconditions.checkArgument(pathFormat.startsWith("/"), "path must start with '/': %s", pathFormat); + final String path = StringUtils.format( + pathFormat, + Arrays.stream(pathParams).map(s -> StringUtils.urlEncode(s.toString())).toArray() + ); + + try { + return new URI(StringUtils.format("%s://%s%s", worker.getScheme(), worker.getHost(), path)).toURL(); + } + catch (URISyntaxException | MalformedURLException e) { + throw Throwables.propagate(e); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index ae8cd0ddd86..2047d53633f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.curator.framework.CuratorFramework; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; @@ -64,7 +65,6 @@ import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -75,7 +75,6 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.server.initialization.IndexerZkConfig; import org.apache.druid.tasklogs.TaskLogStreamer; -import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.KeeperException; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Period; @@ -858,7 +857,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer return Optional.absent(); } else { // Worker is still running this task - final URL url = WorkerHolder.makeWorkerURL(worker, StringUtils.format("/druid/worker/v1/task/%s/log?offset=%d", taskId, offset)); + final URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid/worker/v1/task/%s/log?offset=%d", taskId, offset); return Optional.of( new ByteSource() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index c289a7c8563..48c087c25f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -22,12 +22,11 @@ package org.apache.druid.indexing.overlord.hrtr; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.Sets; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.TaskRunnerUtils; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; @@ -35,7 +34,6 @@ import org.apache.druid.indexing.worker.WorkerHistoryItem; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RetryUtils; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; @@ -47,7 +45,6 @@ import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; -import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -116,7 +113,7 @@ public class WorkerHolder smileMapper, httpClient, workersSyncExec, - makeWorkerURL(worker, "/"), + TaskRunnerUtils.makeWorkerURL(worker, "/"), "/druid-internal/v1/worker", WORKER_SYNC_RESP_TYPE_REF, config.getSyncRequestTimeout().toStandardDuration().getMillis(), @@ -211,18 +208,6 @@ public class WorkerHolder this.continuouslyFailedTasksCount.incrementAndGet(); } - public static URL makeWorkerURL(Worker worker, String path) - { - Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path); - - try { - return new URL(StringUtils.format("%s://%s%s", worker.getScheme(), worker.getHost(), path)); - } - catch (MalformedURLException e) { - throw Throwables.propagate(e); - } - } - public boolean assignTask(Task task) { if (disabled.get()) { @@ -234,7 +219,7 @@ public class WorkerHolder return false; } - URL url = makeWorkerURL(worker, "/druid-internal/v1/worker/assignTask"); + URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid-internal/v1/worker/assignTask"); int numTries = config.getAssignRequestMaxRetries(); try { @@ -282,7 +267,7 @@ public class WorkerHolder public void shutdownTask(String taskId) { - URL url = makeWorkerURL(worker, StringUtils.format("/druid/worker/v1/task/%s/shutdown", taskId)); + final URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid/worker/v1/task/%s/shutdown", taskId); try { RetryUtils.retry( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 920d401ee5a..1616bb44093 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -79,7 +79,7 @@ public class RemoteTaskRunnerTest jsonMapper = rtrTestUtils.getObjectMapper(); cf = rtrTestUtils.getCuratorFramework(); - task = TestTasks.unending("task"); + task = TestTasks.unending("task id with spaces"); } @After @@ -308,7 +308,7 @@ public class RemoteTaskRunnerTest Assert.assertTrue(workerRunningTask(task.getId())); - Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals("task")); + Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals(task.getId())); cf.delete().forPath(joiner.join(statusPath, task.getId())); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java new file mode 100644 index 00000000000..7c02946145e --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import org.apache.druid.indexing.worker.Worker; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URL; + +public class TaskRunnerUtilsTest +{ + @Test + public void testMakeWorkerURL() + { + final URL url = TaskRunnerUtils.makeWorkerURL( + new Worker("https", "1.2.3.4:8290", "1.2.3.4", 1, "0"), + "/druid/worker/v1/task/%s/log", + "foo bar&" + ); + Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo+bar%26/log", url.toString()); + Assert.assertEquals("1.2.3.4:8290", url.getAuthority()); + Assert.assertEquals("/druid/worker/v1/task/foo+bar%26/log", url.getPath()); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java index 0995dba495b..fb9681fdee4 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java @@ -34,6 +34,7 @@ import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import java.io.Closeable; import java.io.InputStream; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -80,7 +81,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes void doTest() { LOG.info("Starting test: ITRealtimeIndexTaskTest"); - try { + try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { // the task will run for 3 minutes and then shutdown itself String task = setShutOffTime( getTaskAsString(getTaskResource()), @@ -153,9 +154,6 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes catch (Exception e) { throw Throwables.propagate(e); } - finally { - unloadAndKillData(INDEX_DATASOURCE); - } } String setShutOffTime(String taskAsString, DateTime time) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java index a689567cdd1..d59c383bbcd 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java @@ -31,6 +31,7 @@ import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.commons.io.IOUtils; import org.joda.time.Interval; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Collections; @@ -53,6 +54,11 @@ public abstract class AbstractIndexerTest @Inject protected TestQueryHelper queryHelper; + protected Closeable unloader(final String dataSource) + { + return () -> unloadAndKillData(dataSource); + } + protected void unloadAndKillData(final String dataSource) { List intervals = coordinator.getSegmentIntervals(dataSource); @@ -68,7 +74,7 @@ public abstract class AbstractIndexerTest unloadAndKillData(dataSource, first, last); } - protected void unloadAndKillData(final String dataSource, String start, String end) + private void unloadAndKillData(final String dataSource, String start, String end) { // Wait for any existing index tasks to complete before disabling the datasource otherwise // realtime tasks can get stuck waiting for handoff. https://github.com/apache/incubator-druid/issues/1729 diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 2f9078aeb16..0901d39daf8 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -26,6 +26,7 @@ import org.apache.druid.testing.utils.RetryUtil; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.io.Closeable; import java.util.List; @Guice(moduleFactory = DruidTestModuleFactory.class) @@ -47,7 +48,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest if (intervalsBeforeCompaction.contains(compactedInterval)) { throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval); } - try { + try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); compactData(false); @@ -59,9 +60,6 @@ public class ITCompactionTaskTest extends AbstractIndexerTest intervalsBeforeCompaction.sort(null); checkCompactionIntervals(intervalsBeforeCompaction); } - finally { - unloadAndKillData(INDEX_DATASOURCE); - } } @Test @@ -70,7 +68,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest loadData(); final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); intervalsBeforeCompaction.sort(null); - try { + try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); compactData(true); @@ -80,9 +78,6 @@ public class ITCompactionTaskTest extends AbstractIndexerTest checkCompactionIntervals(intervalsBeforeCompaction); } - finally { - unloadAndKillData(INDEX_DATASOURCE); - } } private void loadData() throws Exception diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index 2a7bc64fa36..63681e207b6 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -23,6 +23,8 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.io.Closeable; + @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITIndexerTest extends AbstractITBatchIndexTest { @@ -35,7 +37,10 @@ public class ITIndexerTest extends AbstractITBatchIndexTest @Test public void testIndexData() throws Exception { - try { + try ( + final Closeable indexCloseable = unloader(INDEX_DATASOURCE); + final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE) + ) { doIndexTestTest( INDEX_DATASOURCE, INDEX_TASK, @@ -47,9 +52,5 @@ public class ITIndexerTest extends AbstractITBatchIndexTest INDEX_QUERIES_RESOURCE ); } - finally { - unloadAndKillData(INDEX_DATASOURCE); - unloadAndKillData(REINDEX_DATASOURCE); - } } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java index 618084d9b10..b844acd3e68 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java @@ -23,6 +23,8 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.io.Closeable; + @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITParallelIndexTest extends AbstractITBatchIndexTest { @@ -33,15 +35,12 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest @Test public void testIndexData() throws Exception { - try { + try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { doIndexTestTest( INDEX_DATASOURCE, INDEX_TASK, INDEX_QUERIES_RESOURCE ); } - finally { - unloadAndKillData(INDEX_DATASOURCE); - } } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java index a1bfb4c2fc1..dc5017f48db 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java @@ -20,12 +20,12 @@ package org.apache.druid.tests.indexer; import com.beust.jcommander.internal.Lists; -import com.google.common.base.Throwables; import com.google.inject.Inject; import org.apache.druid.curator.discovery.ServerDiscoveryFactory; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; @@ -43,6 +43,7 @@ import org.joda.time.DateTime; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.io.IOException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.List; @@ -70,10 +71,13 @@ public class ITUnionQueryTest extends AbstractIndexerTest IntegrationTestingConfig config; @Test - public void testUnionQuery() + public void testUnionQuery() throws IOException { final int numTasks = 3; - + final Closer closer = Closer.create(); + for (int i = 0; i < numTasks; i++) { + closer.register(unloader(UNION_DATASOURCE + i)); + } try { // Load 4 datasources with same dimensions String task = setShutOffTime( @@ -143,16 +147,12 @@ public class ITUnionQueryTest extends AbstractIndexerTest this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2); } - catch (Exception e) { - LOG.error(e, "Error while testing"); - throw Throwables.propagate(e); + catch (Throwable e) { + throw closer.rethrow(e); } finally { - for (int i = 0; i < numTasks; i++) { - unloadAndKillData(UNION_DATASOURCE + i); - } + closer.close(); } - } private String setShutOffTime(String taskAsString, DateTime time) diff --git a/java-util/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/java-util/src/main/java/org/apache/druid/java/util/common/StringUtils.java index 12a47ab7921..b69f81eff09 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/java-util/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import javax.annotation.Nullable; import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -161,6 +162,16 @@ public class StringUtils return s; } + public static String urlEncode(String s) + { + try { + return URLEncoder.encode(s, "UTF-8"); + } + catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + private static String removeChar(String s, char c, int firstOccurranceIndex) { StringBuilder sb = new StringBuilder(s.length() - 1); @@ -180,6 +191,7 @@ public class StringUtils * irrelevant to null handling of the data. * * @param string the string to test and possibly return + * * @return {@code string} itself if it is non-null; {@code ""} if it is null */ public static String nullToEmptyNonDruidDataString(@Nullable String string) @@ -195,8 +207,9 @@ public class StringUtils * irrelevant to null handling of the data. * * @param string the string to test and possibly return + * * @return {@code string} itself if it is nonempty; {@code null} if it is - * empty or null + * empty or null */ @Nullable public static String emptyToNullNonDruidDataString(@Nullable String string) diff --git a/server/src/main/java/org/apache/druid/server/AsyncManagementForwardingServlet.java b/server/src/main/java/org/apache/druid/server/AsyncManagementForwardingServlet.java index 6454da260ea..3bd31c54a74 100644 --- a/server/src/main/java/org/apache/druid/server/AsyncManagementForwardingServlet.java +++ b/server/src/main/java/org/apache/druid/server/AsyncManagementForwardingServlet.java @@ -20,7 +20,6 @@ package org.apache.druid.server; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.google.inject.Provider; @@ -33,7 +32,6 @@ import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.security.AuthConfig; -import org.apache.http.client.utils.URIBuilder; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.proxy.AsyncProxyServlet; @@ -42,7 +40,6 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.net.URISyntaxException; import java.util.concurrent.TimeUnit; public class AsyncManagementForwardingServlet extends AsyncProxyServlet @@ -143,18 +140,15 @@ public class AsyncManagementForwardingServlet extends AsyncProxyServlet @Override protected String rewriteTarget(HttpServletRequest request) { - try { - return new URIBuilder((String) request.getAttribute(BASE_URI_ATTRIBUTE)) - .setPath(request.getAttribute(MODIFIED_PATH_ATTRIBUTE) != null ? - (String) request.getAttribute(MODIFIED_PATH_ATTRIBUTE) : request.getRequestURI()) - .setQuery(request.getQueryString()) // No need to encode-decode queryString, it is already encoded - .build() - .toString(); - } - catch (URISyntaxException e) { - log.error(e, "Unable to rewrite URI [%s]", e.getMessage()); - throw Throwables.propagate(e); - } + final String encodedPath = request.getAttribute(MODIFIED_PATH_ATTRIBUTE) != null + ? (String) request.getAttribute(MODIFIED_PATH_ATTRIBUTE) + : request.getRequestURI(); + + return JettyUtils.concatenateForRewrite( + (String) request.getAttribute(BASE_URI_ATTRIBUTE), + encodedPath, + request.getQueryString() + ); } @Override diff --git a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index fa41ada5563..8546d845894 100644 --- a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -22,6 +22,7 @@ package org.apache.druid.server; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; @@ -32,6 +33,7 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -48,7 +50,6 @@ import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Authenticator; import org.apache.druid.server.security.AuthenticatorMapper; -import org.apache.http.client.utils.URIBuilder; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; @@ -64,8 +65,6 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -365,29 +364,22 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu request, (String) request.getAttribute(SCHEME_ATTRIBUTE), (String) request.getAttribute(HOST_ATTRIBUTE) - ).toString(); + ); } - protected URI rewriteURI(HttpServletRequest request, String scheme, String host) + protected String rewriteURI(HttpServletRequest request, String scheme, String host) { return makeURI(scheme, host, request.getRequestURI(), request.getQueryString()); } - protected static URI makeURI(String scheme, String host, String requestURI, String rawQueryString) + @VisibleForTesting + static String makeURI(String scheme, String host, String requestURI, String rawQueryString) { - try { - return new URIBuilder() - .setScheme(scheme) - .setHost(host) - .setPath(requestURI) - // No need to encode-decode queryString, it is already encoded - .setQuery(rawQueryString) - .build(); - } - catch (URISyntaxException e) { - log.error(e, "Unable to rewrite URI [%s]", e.getMessage()); - throw Throwables.propagate(e); - } + return JettyUtils.concatenateForRewrite( + StringUtils.format("%s://%s", scheme, host), + requestURI, + rawQueryString + ); } @Override diff --git a/server/src/main/java/org/apache/druid/server/JettyUtils.java b/server/src/main/java/org/apache/druid/server/JettyUtils.java new file mode 100644 index 00000000000..9374fb07d93 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/JettyUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import javax.annotation.Nullable; + +public class JettyUtils +{ + /** + * Concatenate URI parts, in a way that is useful for proxy servlets. + * + * @param base base part of the uri, like http://example.com (no trailing slash) + * @param encodedPath encoded path, like you would get from HttpServletRequest's getRequestURI + * @param encodedQueryString encoded query string, like you would get from HttpServletRequest's getQueryString + */ + public static String concatenateForRewrite( + final String base, + final String encodedPath, + @Nullable final String encodedQueryString + ) + { + // Query string and path are already encoded, no need for anything fancy beyond string concatenation. + + final StringBuilder url = new StringBuilder(base).append(encodedPath); + + if (encodedQueryString != null) { + url.append("?").append(encodedQueryString); + } + + return url.toString(); + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/OverlordProxyServlet.java b/server/src/main/java/org/apache/druid/server/http/OverlordProxyServlet.java index 0b4a12dd276..8ed6ac087ca 100644 --- a/server/src/main/java/org/apache/druid/server/http/OverlordProxyServlet.java +++ b/server/src/main/java/org/apache/druid/server/http/OverlordProxyServlet.java @@ -19,14 +19,13 @@ package org.apache.druid.server.http; -import com.google.common.base.Throwables; import com.google.inject.Inject; import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.guice.annotations.Global; import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.server.JettyUtils; import org.apache.druid.server.security.AuthConfig; import com.google.inject.Provider; import org.eclipse.jetty.client.HttpClient; @@ -36,8 +35,6 @@ import org.eclipse.jetty.proxy.ProxyServlet; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import java.net.URI; -import java.net.URISyntaxException; /** * A Proxy servlet that proxies requests to the overlord. @@ -63,23 +60,16 @@ public class OverlordProxyServlet extends ProxyServlet @Override protected String rewriteTarget(HttpServletRequest request) { - try { - final String overlordLeader = druidLeaderClient.findCurrentLeader(); - if (overlordLeader == null) { - throw new ISE("Can't find Overlord leader."); - } - - String location = StringUtils.format("%s%s", overlordLeader, request.getRequestURI()); - - if (request.getQueryString() != null) { - location = StringUtils.format("%s?%s", location, request.getQueryString()); - } - - return new URI(location).toString(); - } - catch (URISyntaxException e) { - throw Throwables.propagate(e); + final String overlordLeader = druidLeaderClient.findCurrentLeader(); + if (overlordLeader == null) { + throw new ISE("Can't find Overlord leader."); } + + return JettyUtils.concatenateForRewrite( + overlordLeader, + request.getRequestURI(), + request.getQueryString() + ); } @Override diff --git a/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java b/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java index fa6d540ac03..a2f4261763d 100644 --- a/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java +++ b/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java @@ -302,7 +302,7 @@ public class AsyncManagementForwardingServletTest extends BaseJettyTest overlordExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ="); HttpURLConnection connection = ((HttpURLConnection) - new URL(StringUtils.format("http://localhost:%d/proxy/overlord/%s", port, overlordExpectedRequest.path)) + new URL(StringUtils.format("http://localhost:%d/proxy/overlord%s", port, overlordExpectedRequest.path)) .openConnection()); connection.setRequestMethod(overlordExpectedRequest.method); diff --git a/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java index 68082c30610..49cb2a9608c 100644 --- a/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java +++ b/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java @@ -348,13 +348,13 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest ) { @Override - protected URI rewriteURI(HttpServletRequest request, String scheme, String host) + protected String rewriteURI(HttpServletRequest request, String scheme, String host) { String uri = super.rewriteURI(request, scheme, host).toString(); if (uri.contains("/druid/v2")) { - return URI.create(uri.replace("/druid/v2", "/default")); + return URI.create(uri.replace("/druid/v2", "/default")).toString(); } - return URI.create(uri.replace("/proxy", "")); + return URI.create(uri.replace("/proxy", "")).toString(); } }); //NOTE: explicit maxThreads to workaround https://tickets.puppetlabs.com/browse/TK-152 @@ -378,7 +378,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest // test params Assert.assertEquals( - new URI("http://localhost:1234/some/path?param=1"), + "http://localhost:1234/some/path?param=1", AsyncQueryForwardingServlet.makeURI("http", "localhost:1234", "/some/path", "param=1") ); @@ -391,20 +391,19 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest HostAndPort.fromParts("2a00:1450:4007:805::1007", 1234).toString(), "/some/path", "param=1¶m2=%E2%82%AC" - ).toASCIIString() + ) ); // test null query Assert.assertEquals( - new URI("http://localhost/"), + "http://localhost/", AsyncQueryForwardingServlet.makeURI("http", "localhost", "/", null) ); // Test reWrite Encoded interval with timezone info // decoded parameters 1900-01-01T00:00:00.000+01.00 -> 1900-01-01T00:00:00.000+01:00 Assert.assertEquals( - new URI( - "http://localhost:1234/some/path?intervals=1900-01-01T00%3A00%3A00.000%2B01%3A00%2F3000-01-01T00%3A00%3A00.000%2B01%3A00"), + "http://localhost:1234/some/path?intervals=1900-01-01T00%3A00%3A00.000%2B01%3A00%2F3000-01-01T00%3A00%3A00.000%2B01%3A00", AsyncQueryForwardingServlet.makeURI( "http", "localhost:1234", diff --git a/server/src/test/java/org/apache/druid/server/JettyUtilsTest.java b/server/src/test/java/org/apache/druid/server/JettyUtilsTest.java new file mode 100644 index 00000000000..bd6d86f36bc --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/JettyUtilsTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import org.junit.Assert; +import org.junit.Test; + +public class JettyUtilsTest +{ + @Test + public void testConcatenateForRewrite() + { + Assert.assertEquals( + "http://example.com/foo%20bar?q=baz%20qux", + JettyUtils.concatenateForRewrite( + "http://example.com", + "/foo%20bar", + "q=baz%20qux" + ) + ); + } +} diff --git a/server/src/test/java/org/apache/druid/server/http/OverlordProxyServletTest.java b/server/src/test/java/org/apache/druid/server/http/OverlordProxyServletTest.java index dd364b74361..7e99d827bed 100644 --- a/server/src/test/java/org/apache/druid/server/http/OverlordProxyServletTest.java +++ b/server/src/test/java/org/apache/druid/server/http/OverlordProxyServletTest.java @@ -37,12 +37,14 @@ public class OverlordProxyServletTest HttpServletRequest request = EasyMock.createMock(HttpServletRequest.class); EasyMock.expect(request.getQueryString()).andReturn("param1=test¶m2=test2").anyTimes(); - EasyMock.expect(request.getRequestURI()).andReturn("/druid/overlord/worker").anyTimes(); + + // %3A is a colon; test to make sure urlencoded paths work right. + EasyMock.expect(request.getRequestURI()).andReturn("/druid/over%3Alord/worker").anyTimes(); EasyMock.replay(druidLeaderClient, request); URI uri = URI.create(new OverlordProxyServlet(druidLeaderClient, null, null).rewriteTarget(request)); - Assert.assertEquals("https://overlord:port/druid/overlord/worker?param1=test¶m2=test2", uri.toString()); + Assert.assertEquals("https://overlord:port/druid/over%3Alord/worker?param1=test¶m2=test2", uri.toString()); } }