diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 339e01fd8ca..ab9a6382dcc 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -315,6 +315,11 @@
docker-java-api
3.2.0
+
+ com.github.docker-java
+ docker-java-core
+ 3.2.0
+
io.netty
netty-transport-native-kqueue
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java
index d867b209c2a..c8908ab2d09 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java
@@ -21,11 +21,14 @@ package org.apache.druid.testing.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.core.DockerClientBuilder;
+import com.github.dockerjava.core.command.ExecStartResultCallback;
import com.github.dockerjava.netty.NettyDockerCmdExecFactory;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
@@ -39,8 +42,10 @@ import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import java.io.ByteArrayOutputStream;
import java.net.URL;
import java.nio.channels.ClosedChannelException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -154,12 +159,78 @@ public class DruidClusterAdminClient
waitUntilInstanceReady(config.getRouterUrl());
}
+ public Pair runCommandInCoordinatorContainer(String... cmd) throws Exception
+ {
+ return runCommandInDockerContainer(COORDINATOR_DOCKER_CONTAINER_NAME, cmd);
+ }
+
+ public Pair runCommandInCoordinatorTwoContainer(String... cmd) throws Exception
+ {
+ return runCommandInDockerContainer(COORDINATOR_TWO_DOCKER_CONTAINER_NAME, cmd);
+ }
+
+ public Pair runCommandInHistoricalContainer(String... cmd) throws Exception
+ {
+ return runCommandInDockerContainer(HISTORICAL_DOCKER_CONTAINER_NAME, cmd);
+ }
+
+ public Pair runCommandInOverlordContainer(String... cmd) throws Exception
+ {
+ return runCommandInDockerContainer(OVERLORD_DOCKER_CONTAINER_NAME, cmd);
+ }
+
+ public Pair runCommandInOverlordTwoContainer(String... cmd) throws Exception
+ {
+ return runCommandInDockerContainer(OVERLORD_TWO_DOCKER_CONTAINER_NAME, cmd);
+ }
+
+ public Pair runCommandInBrokerContainer(String... cmd) throws Exception
+ {
+ return runCommandInDockerContainer(BROKER_DOCKER_CONTAINER_NAME, cmd);
+ }
+
+ public Pair runCommandInRouterContainer(String... cmd) throws Exception
+ {
+ return runCommandInDockerContainer(ROUTER_DOCKER_CONTAINER_NAME, cmd);
+ }
+
+ public Pair runCommandInMiddleManagerContainer(String... cmd) throws Exception
+ {
+ return runCommandInDockerContainer(MIDDLEMANAGER_DOCKER_CONTAINER_NAME, cmd);
+ }
+
+ private Pair runCommandInDockerContainer(String serviceName, String... cmd) throws Exception
+ {
+ DockerClient dockerClient = DockerClientBuilder.getInstance()
+ .withDockerCmdExecFactory((new NettyDockerCmdExecFactory())
+ .withConnectTimeout(10 * 1000))
+ .build();
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+ ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(findDockerContainer(dockerClient, serviceName))
+ .withAttachStderr(true)
+ .withAttachStdout(true)
+ .withCmd(cmd)
+ .exec();
+ dockerClient.execStartCmd(execCreateCmdResponse.getId())
+ .exec(new ExecStartResultCallback(stdout, stderr))
+ .awaitCompletion();
+
+ return new Pair<>(stdout.toString(StandardCharsets.UTF_8.name()), stderr.toString(StandardCharsets.UTF_8.name()));
+ }
+
private void restartDockerContainer(String serviceName)
{
DockerClient dockerClient = DockerClientBuilder.getInstance()
.withDockerCmdExecFactory((new NettyDockerCmdExecFactory())
.withConnectTimeout(10 * 1000))
.build();
+ dockerClient.restartContainerCmd(findDockerContainer(dockerClient, serviceName)).exec();
+ }
+
+ private String findDockerContainer(DockerClient dockerClient, String serviceName)
+ {
+
List containers = dockerClient.listContainersCmd().exec();
Optional containerName = containers.stream()
.filter(container -> Arrays.asList(container.getNames()).contains(serviceName))
@@ -170,7 +241,7 @@ public class DruidClusterAdminClient
LOG.error("Cannot find docker container for " + serviceName);
throw new ISE("Cannot find docker container for " + serviceName);
}
- dockerClient.restartContainerCmd(containerName.get()).exec();
+ return containerName.get();
}
private void waitUntilInstanceReady(final String host)