integration test for coordinator and overlord leadership client (#10680)

* integration test for coordinator and overlord leadership, added sys.servers is_leader column

* docs

* remove not needed

* fix comments

* fix compile heh

* oof

* revert unintended

* fix tests, split out docker-compose file selection from starting cluster, use docker-compose down to stop cluster

* fixes

* style

* dang

* heh

* scripts are hard

* fix spelling

* fix thing that must not matter since was already wrong ip, log when test fails

* needs more heap

* fix merge

* less aggro
This commit is contained in:
Clint Wylie 2020-12-17 22:50:12 -08:00 committed by GitHub
parent 796c25532e
commit da0eabaa01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 1386 additions and 385 deletions

View File

@ -483,13 +483,19 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) other integration tests"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
# END - Integration tests for Compile with Java 8 and Run with Java 8
# START - Integration tests for Compile with Java 8 and Run with Java 11
@ -546,7 +552,12 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
# END - Integration tests for Compile with Java 8 and Run with Java 11
- &integration_batch_index_k8s

View File

@ -1139,6 +1139,7 @@ Servers table lists all discovered servers in the cluster.
|tier|STRING|Distribution tier see [druid.server.tier](../configuration/index.md#historical-general-configuration). Only valid for HISTORICAL type, for other types it's null|
|current_size|LONG|Current size of segments in bytes on this server. Only valid for HISTORICAL type, for other types it's 0|
|max_size|LONG|Max size in bytes this server recommends to assign to segments see [druid.server.maxSize](../configuration/index.md#historical-general-configuration). Only valid for HISTORICAL type, for other types it's 0|
|is_leader|LONG|1 if the server is currently the 'leader' (for services which have the concept of leadership), otherwise 0 if the server is not the leader, or the default long value (0 or null depending on `druid.generic.useDefaultValueForNull`) if the server type does not have the concept of leadership|
To retrieve information about all servers, use the query:

View File

@ -21,8 +21,10 @@ package org.apache.druid.security.basic;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.security.basic.authentication.entity.BasicAuthenticatorUser;
@ -66,6 +68,9 @@ public class BasicAuthUtils
public static final int DEFAULT_CREDENTIAL_CACHE_SIZE = 100;
public static final int KEY_LENGTH = 512;
public static final String ALGORITHM = "PBKDF2WithHmacSHA512";
public static final int MAX_INIT_RETRIES = 2;
public static final Predicate<Throwable> SHOULD_RETRY_INIT =
(throwable) -> throwable instanceof BasicSecurityDBResourceException;
public static final TypeReference<Map<String, BasicAuthenticatorUser>> AUTHENTICATOR_USER_MAP_TYPE_REFERENCE =
new TypeReference<Map<String, BasicAuthenticatorUser>>()
@ -277,4 +282,14 @@ public class BasicAuthUtils
throw new ISE(ioe, "Couldn't serialize authorizer roleMap!");
}
}
public static void maybeInitialize(final RetryUtils.Task<?> task)
{
try {
RetryUtils.retry(task, SHOULD_RETRY_INIT, MAX_INIT_RETRIES);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -119,46 +119,50 @@ public class CoordinatorBasicAuthenticatorMetadataStorageUpdater implements Basi
try {
LOG.info("Starting CoordinatorBasicAuthenticatorMetadataStorageUpdater.");
for (Map.Entry<String, Authenticator> entry : authenticatorMapper.getAuthenticatorMap().entrySet()) {
Authenticator authenticator = entry.getValue();
if (authenticator instanceof BasicHTTPAuthenticator) {
String authenticatorName = entry.getKey();
authenticatorPrefixes.add(authenticatorName);
BasicHTTPAuthenticator basicHTTPAuthenticator = (BasicHTTPAuthenticator) authenticator;
BasicAuthDBConfig dbConfig = basicHTTPAuthenticator.getDbConfig();
byte[] userMapBytes = getCurrentUserMapBytes(authenticatorName);
Map<String, BasicAuthenticatorUser> userMap = BasicAuthUtils.deserializeAuthenticatorUserMap(
objectMapper,
userMapBytes
);
cachedUserMaps.put(authenticatorName, new BasicAuthenticatorUserMapBundle(userMap, userMapBytes));
BasicAuthUtils.maybeInitialize(
() -> {
for (Map.Entry<String, Authenticator> entry : authenticatorMapper.getAuthenticatorMap().entrySet()) {
Authenticator authenticator = entry.getValue();
if (authenticator instanceof BasicHTTPAuthenticator) {
String authenticatorName = entry.getKey();
authenticatorPrefixes.add(authenticatorName);
BasicHTTPAuthenticator basicHTTPAuthenticator = (BasicHTTPAuthenticator) authenticator;
BasicAuthDBConfig dbConfig = basicHTTPAuthenticator.getDbConfig();
byte[] userMapBytes = getCurrentUserMapBytes(authenticatorName);
Map<String, BasicAuthenticatorUser> userMap = BasicAuthUtils.deserializeAuthenticatorUserMap(
objectMapper,
userMapBytes
);
cachedUserMaps.put(authenticatorName, new BasicAuthenticatorUserMapBundle(userMap, userMapBytes));
if (dbConfig.getInitialAdminPassword() != null && !userMap.containsKey(BasicAuthUtils.ADMIN_NAME)) {
createUserInternal(authenticatorName, BasicAuthUtils.ADMIN_NAME);
setUserCredentialsInternal(
authenticatorName,
BasicAuthUtils.ADMIN_NAME,
new BasicAuthenticatorCredentialUpdate(
dbConfig.getInitialAdminPassword().getPassword(),
BasicAuthUtils.DEFAULT_KEY_ITERATIONS
)
);
}
if (dbConfig.getInitialAdminPassword() != null && !userMap.containsKey(BasicAuthUtils.ADMIN_NAME)) {
createUserInternal(authenticatorName, BasicAuthUtils.ADMIN_NAME);
setUserCredentialsInternal(
authenticatorName,
BasicAuthUtils.ADMIN_NAME,
new BasicAuthenticatorCredentialUpdate(
dbConfig.getInitialAdminPassword().getPassword(),
BasicAuthUtils.DEFAULT_KEY_ITERATIONS
)
);
}
if (dbConfig.getInitialInternalClientPassword() != null
&& !userMap.containsKey(BasicAuthUtils.INTERNAL_USER_NAME)) {
createUserInternal(authenticatorName, BasicAuthUtils.INTERNAL_USER_NAME);
setUserCredentialsInternal(
authenticatorName,
BasicAuthUtils.INTERNAL_USER_NAME,
new BasicAuthenticatorCredentialUpdate(
dbConfig.getInitialInternalClientPassword().getPassword(),
BasicAuthUtils.DEFAULT_KEY_ITERATIONS
)
);
}
}
}
if (dbConfig.getInitialInternalClientPassword() != null
&& !userMap.containsKey(BasicAuthUtils.INTERNAL_USER_NAME)) {
createUserInternal(authenticatorName, BasicAuthUtils.INTERNAL_USER_NAME);
setUserCredentialsInternal(
authenticatorName,
BasicAuthUtils.INTERNAL_USER_NAME,
new BasicAuthenticatorCredentialUpdate(
dbConfig.getInitialInternalClientPassword().getPassword(),
BasicAuthUtils.DEFAULT_KEY_ITERATIONS
)
);
}
}
}
return true;
});
ScheduledExecutors.scheduleWithFixedDelay(
exec,

View File

@ -143,42 +143,52 @@ public class CoordinatorBasicAuthorizerMetadataStorageUpdater implements BasicAu
try {
LOG.info("Starting CoordinatorBasicAuthorizerMetadataStorageUpdater");
for (Map.Entry<String, Authorizer> entry : authorizerMapper.getAuthorizerMap().entrySet()) {
Authorizer authorizer = entry.getValue();
if (authorizer instanceof BasicRoleBasedAuthorizer) {
BasicRoleBasedAuthorizer basicRoleBasedAuthorizer = (BasicRoleBasedAuthorizer) authorizer;
BasicAuthDBConfig dbConfig = basicRoleBasedAuthorizer.getDbConfig();
String authorizerName = entry.getKey();
authorizerNames.add(authorizerName);
BasicAuthUtils.maybeInitialize(
() -> {
for (Map.Entry<String, Authorizer> entry : authorizerMapper.getAuthorizerMap().entrySet()) {
Authorizer authorizer = entry.getValue();
if (authorizer instanceof BasicRoleBasedAuthorizer) {
BasicRoleBasedAuthorizer basicRoleBasedAuthorizer = (BasicRoleBasedAuthorizer) authorizer;
BasicAuthDBConfig dbConfig = basicRoleBasedAuthorizer.getDbConfig();
String authorizerName = entry.getKey();
authorizerNames.add(authorizerName);
byte[] userMapBytes = getCurrentUserMapBytes(authorizerName);
Map<String, BasicAuthorizerUser> userMap = BasicAuthUtils.deserializeAuthorizerUserMap(
objectMapper,
userMapBytes
);
cachedUserMaps.put(authorizerName, new BasicAuthorizerUserMapBundle(userMap, userMapBytes));
byte[] userMapBytes = getCurrentUserMapBytes(authorizerName);
Map<String, BasicAuthorizerUser> userMap = BasicAuthUtils.deserializeAuthorizerUserMap(
objectMapper,
userMapBytes
);
cachedUserMaps.put(authorizerName, new BasicAuthorizerUserMapBundle(userMap, userMapBytes));
byte[] groupMappingMapBytes = getCurrentGroupMappingMapBytes(authorizerName);
Map<String, BasicAuthorizerGroupMapping> groupMappingMap = BasicAuthUtils.deserializeAuthorizerGroupMappingMap(
objectMapper,
groupMappingMapBytes
);
cachedGroupMappingMaps.put(authorizerName, new BasicAuthorizerGroupMappingMapBundle(groupMappingMap, groupMappingMapBytes));
byte[] groupMappingMapBytes = getCurrentGroupMappingMapBytes(authorizerName);
Map<String, BasicAuthorizerGroupMapping> groupMappingMap = BasicAuthUtils.deserializeAuthorizerGroupMappingMap(
objectMapper,
groupMappingMapBytes
);
cachedGroupMappingMaps.put(
authorizerName,
new BasicAuthorizerGroupMappingMapBundle(
groupMappingMap,
groupMappingMapBytes
)
);
byte[] roleMapBytes = getCurrentRoleMapBytes(authorizerName);
Map<String, BasicAuthorizerRole> roleMap = BasicAuthUtils.deserializeAuthorizerRoleMap(
objectMapper,
roleMapBytes
);
cachedRoleMaps.put(authorizerName, new BasicAuthorizerRoleMapBundle(roleMap, roleMapBytes));
byte[] roleMapBytes = getCurrentRoleMapBytes(authorizerName);
Map<String, BasicAuthorizerRole> roleMap = BasicAuthUtils.deserializeAuthorizerRoleMap(
objectMapper,
roleMapBytes
);
cachedRoleMaps.put(authorizerName, new BasicAuthorizerRoleMapBundle(roleMap, roleMapBytes));
initSuperUsersAndGroupMapping(authorizerName, userMap, roleMap, groupMappingMap,
dbConfig.getInitialAdminUser(),
dbConfig.getInitialAdminRole(),
dbConfig.getInitialAdminGroupMapping()
);
}
}
initSuperUsersAndGroupMapping(authorizerName, userMap, roleMap, groupMappingMap,
dbConfig.getInitialAdminUser(),
dbConfig.getInitialAdminRole(),
dbConfig.getInitialAdminGroupMapping()
);
}
}
return true;
});
ScheduledExecutors.scheduleWithFixedDelay(
exec,

View File

@ -484,6 +484,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
final String groupId = task.isPresent() ? task.get().getGroupId() : null;
final String taskType = task.isPresent() ? task.get().getType() : null;
final TaskStatus taskStatus = taskRunner.getStatus(taskId);
if (taskStatus != null) {
return new TaskStatusResponse(
taskId,

View File

@ -14,7 +14,61 @@
# limitations under the License.
version: "2.2"
# IP address ranges:
# 172.172.172.2: zookeeper + kafka (dockerfile depends on this)
# 172.172.172.3: metadata store
# 172.172.172.10-19: overlord
# 172.172.172.20-29: coordinator
# 172.172.172.30-39: historicals
# 172.172.172.40-49: middle managers
# 172.172.172.50-59: indexers
# 172.172.172.60-69: brokers
# 172.172.172.70-79: routers
# 172.172.172.101+: hadoop, other supporting infra
networks:
druid-it-net:
name: druid-it-net
ipam:
config:
- subnet: 172.172.172.0/24
services:
### supporting infra:
druid-zookeeper-kafka:
image: druid/cluster
container_name: druid-zookeeper-kafka
ports:
- 2181:2181
- 9092:9092
- 9093:9093
networks:
druid-it-net:
## Dockerfile depends on this address for this container it seems...
ipv4_address: 172.172.172.2
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./service-supervisords/zookeeper.conf:/usr/lib/druid/conf/zookeeper.conf
- ./service-supervisords/kafka.conf:/usr/lib/druid/conf/kafka.conf
env_file:
- ./environment-configs/common
druid-metadata-storage:
image: druid/cluster
container_name: druid-metadata-storage
ports:
- 3306:3306
networks:
druid-it-net:
ipv4_address: 172.172.172.3
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./service-supervisords/metadata-storage.conf:/usr/lib/druid/conf/metadata-storage.conf
env_file:
- ./environment-configs/common
druid-it-hadoop:
image: druid-it/hadoop:2.8.5
container_name: druid-it-hadoop
@ -45,7 +99,7 @@ services:
- 51111:51111
networks:
druid-it-net:
ipv4_address: 172.172.172.15
ipv4_address: 172.172.172.101
privileged: true
volumes:
- ${HOME}/shared:/shared
@ -55,45 +109,14 @@ services:
/etc/bootstrap.sh && \
tail -f /dev/null'"
druid-zookeeper-kafka:
image: druid/cluster
container_name: druid-zookeeper-kafka
ports:
- 2181:2181
- 9092:9092
- 9093:9093
networks:
druid-it-net:
ipv4_address: 172.172.172.2
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./service-supervisords/zookeeper.conf:/usr/lib/druid/conf/zookeeper.conf
- ./service-supervisords/kafka.conf:/usr/lib/druid/conf/kafka.conf
env_file:
- ./environment-configs/common
druid-metadata-storage:
image: druid/cluster
container_name: druid-metadata-storage
ports:
- 3306:3306
networks:
druid-it-net:
ipv4_address: 172.172.172.3
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./service-supervisords/metadata-storage.conf:/usr/lib/druid/conf/metadata-storage.conf
env_file:
- ./environment-configs/common
### overlords
druid-overlord:
image: druid/cluster
container_name: druid-overlord
hostname: druid-overlord
networks:
druid-it-net:
ipv4_address: 172.172.172.4
ipv4_address: 172.172.172.10
ports:
- 8090:8090
- 8290:8290
@ -106,12 +129,33 @@ services:
- ./environment-configs/common
- ./environment-configs/overlord
druid-overlord-two:
image: druid/cluster
container_name: druid-overlord-two
hostname: druid-overlord-two
networks:
druid-it-net:
ipv4_address: 172.172.172.11
ports:
- 8590:8090
- 8790:8290
- 6009:5009
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
env_file:
- ./environment-configs/common
- ./environment-configs/overlord
### coordinators
druid-coordinator:
image: druid/cluster
container_name: druid-coordinator
hostname: druid-coordinator
networks:
druid-it-net:
ipv4_address: 172.172.172.5
ipv4_address: 172.172.172.20
ports:
- 8081:8081
- 8281:8281
@ -119,17 +163,38 @@ services:
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid-overlord.conf
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
env_file:
- ./environment-configs/common
- ./environment-configs/coordinator
druid-coordinator-two:
image: druid/cluster
container_name: druid-coordinator-two
hostname: druid-coordinator-two
networks:
druid-it-net:
ipv4_address: 172.172.172.21
ports:
- 8581:8081
- 8781:8281
- 6006:5006
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
env_file:
- ./environment-configs/common
- ./environment-configs/coordinator
### historicals
druid-historical:
image: druid/cluster
container_name: druid-historical
hostname: druid-historical
networks:
druid-it-net:
ipv4_address: 172.172.172.6
ipv4_address: 172.172.172.30
ports:
- 8083:8083
- 8283:8283
@ -142,12 +207,14 @@ services:
- ./environment-configs/common
- ./environment-configs/historical
### middle managers
druid-middlemanager:
image: druid/cluster
container_name: druid-middlemanager
hostname: druid-middlemanager
networks:
druid-it-net:
ipv4_address: 172.172.172.7
ipv4_address: 172.172.172.40
ports:
- 5008:5008
- 8091:8091
@ -173,12 +240,14 @@ services:
- ./environment-configs/common
- ./environment-configs/middlemanager
### indexers
druid-indexer:
image: druid/cluster
container_name: druid-indexer
hostname: druid-indexer
networks:
druid-it-net:
ipv4_address: 172.172.172.8
ipv4_address: 172.172.172.50
ports:
- 5008:5008
- 8091:8091
@ -192,12 +261,14 @@ services:
- ./environment-configs/common
- ./environment-configs/indexer
### brokers
druid-broker:
image: druid/cluster
hostname: druid-broker
container_name: druid-broker
networks:
druid-it-net:
ipv4_address: 172.172.172.9
ipv4_address: 172.172.172.60
ports:
- 5005:5005
- 8082:8082
@ -210,12 +281,14 @@ services:
- ./environment-configs/common
- ./environment-configs/broker
### routers
druid-router:
image: druid/cluster
container_name: druid-router
hostname: druid-router
networks:
druid-it-net:
ipv4_address: 172.172.172.10
ipv4_address: 172.172.172.70
ports:
- 5004:5004
- 8888:8888
@ -233,7 +306,7 @@ services:
container_name: druid-router-permissive-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.11
ipv4_address: 172.172.172.71
ports:
- 5001:5001
- 8889:8889
@ -251,7 +324,7 @@ services:
container_name: druid-router-no-client-auth-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.12
ipv4_address: 172.172.172.72
ports:
- 5002:5002
- 8890:8890
@ -269,7 +342,7 @@ services:
container_name: druid-router-custom-check-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.13
ipv4_address: 172.172.172.73
ports:
- 5003:5003
- 8891:8891
@ -281,10 +354,3 @@ services:
env_file:
- ./environment-configs/common
- ./environment-configs/router-custom-check-tls
networks:
druid-it-net:
name: druid-it-net
ipam:
config:
- subnet: 172.172.172.0/24

View File

@ -35,9 +35,6 @@ services:
service: druid-overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
@ -48,10 +45,6 @@ services:
service: druid-coordinator
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-overlord:druid-overlord
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-overlord
- druid-metadata-storage
@ -63,8 +56,6 @@ services:
service: druid-historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-zookeeper-kafka
@ -74,9 +65,6 @@ services:
service: druid-indexer
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-overlord:druid-overlord
depends_on:
- druid-zookeeper-kafka
- druid-overlord
@ -87,10 +75,6 @@ services:
service: druid-broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-indexer:druid-indexer
- druid-historical:druid-historical
depends_on:
- druid-zookeeper-kafka
- druid-indexer
@ -102,10 +86,6 @@ services:
service: druid-router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
depends_on:
- druid-zookeeper-kafka
- druid-coordinator

View File

@ -0,0 +1,121 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "2.2"
services:
druid-zookeeper-kafka:
extends:
file: docker-compose.base.yml
service: druid-zookeeper-kafka
druid-metadata-storage:
extends:
file: docker-compose.base.yml
service: druid-metadata-storage
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-zookeeper-kafka
druid-coordinator:
extends:
file: docker-compose.base.yml
service: druid-coordinator
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- DRUID_LOG_PATH=/shared/logs/ha-coordinator-one.log
- druid_manager_config_pollDuration=PT10S
- druid_manager_rules_pollDuration=PT10S
- druid_manager_segments_pollDuration=PT10S
- druid_coordinator_period=PT10S
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
druid-coordinator-two:
extends:
file: docker-compose.base.yml
service: druid-coordinator-two
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- DRUID_LOG_PATH=/shared/logs/ha-coordinator-two.log
- druid_host=druid-coordinator-two
- druid_manager_config_pollDuration=PT10S
- druid_manager_rules_pollDuration=PT10S
- druid_manager_segments_pollDuration=PT10S
- druid_coordinator_period=PT10S
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
druid-overlord:
extends:
file: docker-compose.base.yml
service: druid-overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- DRUID_LOG_PATH=/shared/logs/ha-overlord-one.log
depends_on:
- druid-coordinator
- druid-coordinator-two
- druid-metadata-storage
- druid-zookeeper-kafka
druid-overlord-two:
extends:
file: docker-compose.base.yml
service: druid-overlord-two
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- DRUID_LOG_PATH=/shared/logs/ha-overlord-two.log
- druid_host=druid-overlord-two
depends_on:
- druid-coordinator
- druid-coordinator-two
- druid-metadata-storage
- druid-zookeeper-kafka
druid-broker:
extends:
file: docker-compose.base.yml
service: druid-broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-coordinator
- druid-coordinator-two
- druid-overlord
- druid-overlord-two
- druid-zookeeper-kafka
druid-router:
extends:
file: docker-compose.base.yml
service: druid-router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-coordinator
- druid-coordinator-two
- druid-overlord
- druid-overlord-two
- druid-broker
networks:
druid-it-net:
name: druid-it-net
ipam:
config:
- subnet: 172.172.172.0/24

View File

@ -35,9 +35,6 @@ services:
- ${OVERRIDE_ENV}
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
@ -50,10 +47,6 @@ services:
- ${OVERRIDE_ENV}
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-overlord:druid-overlord
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-overlord
- druid-metadata-storage
@ -67,8 +60,6 @@ services:
- ${OVERRIDE_ENV}
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-zookeeper-kafka
@ -80,9 +71,6 @@ services:
- ${OVERRIDE_ENV}
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-overlord:druid-overlord
depends_on:
- druid-zookeeper-kafka
- druid-overlord
@ -95,10 +83,6 @@ services:
- ${OVERRIDE_ENV}
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-middlemanager:druid-middlemanager
- druid-historical:druid-historical
depends_on:
- druid-zookeeper-kafka
- druid-middlemanager
@ -112,10 +96,6 @@ services:
- ${OVERRIDE_ENV}
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
depends_on:
- druid-zookeeper-kafka
- druid-coordinator

View File

@ -35,9 +35,6 @@ services:
service: druid-overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
@ -48,10 +45,6 @@ services:
service: druid-coordinator
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-overlord:druid-overlord
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-overlord
- druid-metadata-storage
@ -63,8 +56,6 @@ services:
service: druid-historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-zookeeper-kafka
@ -74,9 +65,6 @@ services:
service: druid-broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-historical:druid-historical
depends_on:
- druid-zookeeper-kafka
- druid-historical
@ -87,10 +75,6 @@ services:
service: druid-router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
depends_on:
- druid-zookeeper-kafka
- druid-coordinator
@ -115,8 +99,6 @@ services:
- ./environment-configs/historical-for-query-retry-test
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-zookeeper-kafka

View File

@ -19,10 +19,6 @@ services:
extends:
file: docker-compose.base.yml
service: druid-router-permissive-tls
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
depends_on:
- druid-zookeeper-kafka
- druid-metadata-storage
@ -37,10 +33,6 @@ services:
extends:
file: docker-compose.base.yml
service: druid-router-no-client-auth-tls
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
depends_on:
- druid-zookeeper-kafka
- druid-metadata-storage
@ -55,10 +47,6 @@ services:
extends:
file: docker-compose.base.yml
service: druid-router-custom-check-tls
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
depends_on:
- druid-zookeeper-kafka
- druid-metadata-storage

View File

@ -29,31 +29,24 @@ services:
depends_on:
- druid-zookeeper-kafka
druid-overlord:
extends:
file: docker-compose.base.yml
service: druid-overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
druid-coordinator:
extends:
file: docker-compose.base.yml
service: druid-coordinator
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-overlord:druid-overlord
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-overlord
- druid-metadata-storage
- druid-zookeeper-kafka
druid-overlord:
extends:
file: docker-compose.base.yml
service: druid-overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-coordinator
- druid-metadata-storage
- druid-zookeeper-kafka
@ -63,8 +56,6 @@ services:
service: druid-historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-zookeeper-kafka
@ -74,9 +65,6 @@ services:
service: druid-middlemanager
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-overlord:druid-overlord
depends_on:
- druid-zookeeper-kafka
- druid-overlord
@ -87,11 +75,8 @@ services:
service: druid-broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-middlemanager:druid-middlemanager
- druid-historical:druid-historical
depends_on:
- druid-coordinator
- druid-zookeeper-kafka
- druid-middlemanager
- druid-historical
@ -102,14 +87,11 @@ services:
service: druid-router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
depends_on:
- druid-zookeeper-kafka
- druid-coordinator
- druid-broker
- druid-overlord
networks:
druid-it-net:

View File

@ -84,7 +84,7 @@ setupData()
# The "query" and "security" test groups require data to be setup before running the tests.
# In particular, they requires segments to be download from a pre-existing s3 bucket.
# This is done by using the loadSpec put into metadatastore and s3 credientials set below.
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]; then
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]; then
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
&& cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid && /etc/init.d/mysql stop

View File

@ -21,9 +21,10 @@ DRUID_SERVICE=broker
DRUID_LOG_PATH=/shared/logs/broker.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx192m -Xms192m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
SERVICE_DRUID_JAVA_OPTS=-server -Xms192m -Xmx256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
# Druid configs
druid_host=druid-broker
druid_processing_buffer_sizeBytes=25000000
druid_query_groupBy_maxOnDiskStorage=300000000
druid_server_http_numThreads=40

View File

@ -66,4 +66,5 @@ druid_zk_service_host=druid-zookeeper-kafka
druid_auth_basic_common_maxSyncRetries=20
druid_indexer_logs_directory=/shared/tasklogs
druid_sql_enable=true
druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
druid_request_logging_type=slf4j

View File

@ -24,6 +24,7 @@ DRUID_LOG_PATH=/shared/logs/coordinator.log
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006
# Druid configs
druid_host=druid-coordinator
druid_metadata_storage_type=mysql
druid_metadata_storage_connector_connectURI=jdbc:mysql://druid-metadata-storage/druid
druid_metadata_storage_connector_user=druid

View File

@ -24,6 +24,7 @@ DRUID_LOG_PATH=/shared/logs/historical.log
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007
# Druid configs
druid_host=druid-historical
druid_processing_buffer_sizeBytes=25000000
druid_processing_numThreads=2
druid_query_groupBy_maxOnDiskStorage=300000000

View File

@ -24,6 +24,7 @@ DRUID_LOG_PATH=/shared/logs/indexer.log
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
# Druid configs
druid_host=druid-indexer
druid_server_http_numThreads=4
druid_storage_storageDirectory=/shared/storage

View File

@ -24,6 +24,7 @@ DRUID_LOG_PATH=/shared/logs/middlemanager.log
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
# Druid configs
druid_host=druid-middlemanager
druid_server_http_numThreads=100
druid_storage_storageDirectory=/shared/storage
druid_indexer_runner_javaOptsArray=["-server", "-Xmx256m", "-Xms256m", "-XX:NewSize=128m", "-XX:MaxNewSize=128m", "-XX:+UseG1GC", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml"]

View File

@ -24,6 +24,7 @@ DRUID_LOG_PATH=/shared/logs/overlord.log
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009
# Druid configs
druid_host=druid-overlord
druid_metadata_storage_type=mysql
druid_metadata_storage_connector_connectURI=jdbc:mysql://druid-metadata-storage/druid
druid_metadata_storage_connector_user=druid

View File

@ -24,6 +24,7 @@ DRUID_LOG_PATH=/shared/logs/router.log
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5004
# Druid configs
druid_host=druid-router
druid_auth_basic_common_cacheDirectory=/tmp/authCache/router
druid_sql_avatica_enable=true
druid_server_https_crlPath=/tls/revocations.crl

View File

@ -0,0 +1,20 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9','twitterstream','2013-05-13T01:08:18.192Z','2013-01-01T00:00:00.000Z','2013-01-02T00:00:00.000Z',0,'2013-01-02T04:13:41.980Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z\",\"version\":\"2013-01-02T04:13:41.980Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/2013-01-02T04:13:41.980Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":445235220,\"identifier\":\"twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9\"}');
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9','twitterstream','2013-05-13T00:03:28.640Z','2013-01-02T00:00:00.000Z','2013-01-03T00:00:00.000Z',0,'2013-01-03T03:44:58.791Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z\",\"version\":\"2013-01-03T03:44:58.791Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z/2013-01-03T03:44:58.791Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":435325540,\"identifier\":\"twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9\"}');
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9','twitterstream','2013-05-13T00:03:48.807Z','2013-01-03T00:00:00.000Z','2013-01-04T00:00:00.000Z',0,'2013-01-04T04:09:13.590Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-03T00:00:00.000Z/2013-01-04T00:00:00.000Z\",\"version\":\"2013-01-04T04:09:13.590Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z/2013-01-04T04:09:13.590Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":411651320,\"identifier\":\"twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9\"}');
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','wikipedia_editstream','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"wikipedia_editstream\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/wikipedia_editstream/2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z/2013-01-10T08:13:47.830Z_v9/0/index.zip\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"}');
INSERT INTO druid_segments (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES ('wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z', 'wikipedia', '2013-08-08T21:26:23.799Z', '2013-08-01T00:00:00.000Z', '2013-08-02T00:00:00.000Z', '0', '2013-08-08T21:22:48.989Z', '1', '{\"dataSource\":\"wikipedia\",\"interval\":\"2013-08-01T00:00:00.000Z/2013-08-02T00:00:00.000Z\",\"version\":\"2013-08-08T21:22:48.989Z\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/wikipedia/20130801T000000.000Z_20130802T000000.000Z/2013-08-08T21_22_48.989Z/0/index.zip\"},\"dimensions\":\"dma_code,continent_code,geo,area_code,robot,country_name,network,city,namespace,anonymous,unpatrolled,page,postal_code,language,newpage,user,region_lookup\",\"metrics\":\"count,delta,variation,added,deleted\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":24664730,\"identifier\":\"wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z\"}');

View File

@ -412,6 +412,9 @@
<configuration>
<environmentVariables>
<DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>
<DRUID_INTEGRATION_TEST_GROUP>${groups}</DRUID_INTEGRATION_TEST_GROUP>
<DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH>${override.config.path}</DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH>
<DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER>
</environmentVariables>
<executable>${project.basedir}/stop_cluster.sh</executable>
</configuration>

View File

@ -0,0 +1,64 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# picks appropriate docker-compose argments to use when bringing up and down integration test clusters
# for a given test group
getComposeArgs()
{
if [ -z "$DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH" ]
then
# Sanity check: DRUID_INTEGRATION_TEST_INDEXER must be "indexer" or "middleManager"
if [ "$DRUID_INTEGRATION_TEST_INDEXER" != "indexer" ] && [ "$DRUID_INTEGRATION_TEST_INDEXER" != "middleManager" ]
then
echo "DRUID_INTEGRATION_TEST_INDEXER must be 'indexer' or 'middleManager' (is '$DRUID_INTEGRATION_TEST_INDEXER')"
exit 1
fi
if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ]
then
# Sanity check: cannot combine CliIndexer tests with security, query-retry tests
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]
then
echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer"
exit 1
fi
# Replace MiddleManager with Indexer
echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml"
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
then
# default + additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls)
echo "-f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.security.yml"
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]
then
# default + additional historical modified for query retry test
# See CliHistoricalForQueryRetryTest.
echo "-f ${DOCKERDIR}/docker-compose.query-retry-test.yml"
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]
then
# the 'high availability' test cluster with multiple coordinators and overlords
echo "-f ${DOCKERDIR}/docker-compose.high-availability.yml"
else
# default
echo "-f ${DOCKERDIR}/docker-compose.yml"
fi
else
# with override config
echo "-f ${DOCKERDIR}/docker-compose.override-env.yml"
fi
}

View File

@ -16,10 +16,7 @@
set -e
# Create docker network
{
docker network create --subnet=172.172.172.0/24 druid-it-net
}
. $(dirname "$0")/docker_compose_args.sh
if [ -z "$DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH" ]
then
@ -47,42 +44,6 @@ fi
docker-compose -f ${DOCKERDIR}/docker-compose.druid-hadoop.yml up -d
fi
# Start Druid services
if [ -z "$DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH" ]
then
# Sanity check: DRUID_INTEGRATION_TEST_INDEXER must be "indexer" or "middleManager"
if [ "$DRUID_INTEGRATION_TEST_INDEXER" != "indexer" ] && [ "$DRUID_INTEGRATION_TEST_INDEXER" != "middleManager" ]
then
echo "DRUID_INTEGRATION_TEST_INDEXER must be 'indexer' or 'middleManager' (is '$DRUID_INTEGRATION_TEST_INDEXER')"
exit 1
fi
if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ]
then
# Sanity check: cannot combine CliIndexer tests with security, query-retry tests
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]
then
echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer"
exit 1
fi
# Replace MiddleManager with Indexer
docker-compose -f ${DOCKERDIR}/docker-compose.cli-indexer.yml up -d
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
then
# Start default Druid services and additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls)
docker-compose -f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.security.yml up -d
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]
then
# Start default Druid services with an additional historical modified for query retry test
# See CliHistoricalForQueryRetryTest.
docker-compose -f ${DOCKERDIR}/docker-compose.query-retry-test.yml up -d
else
# Start default Druid services
docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d
fi
else
# run druid cluster with override config
OVERRIDE_ENV=$DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH docker-compose -f ${DOCKERDIR}/docker-compose.override-env.yml up -d
fi
# Start Druid cluster
docker-compose $(getComposeArgs) up -d
}

View File

@ -33,11 +33,20 @@ import java.util.Map;
public class ConfigFileConfigProvider implements IntegrationTestingConfigProvider
{
private static final Logger LOG = new Logger(ConfigFileConfigProvider.class);
private String routerHost;
private String brokerHost;
private String historicalHost;
private String coordinatorHost;
private String coordinatorTwoHost;
private String overlordHost;
private String overlordTwoHost;
private String routerUrl;
private String brokerUrl;
private String historicalUrl;
private String coordinatorUrl;
private String indexerUrl;
private String coordinatorTwoUrl;
private String overlordUrl;
private String overlordTwoUrl;
private String permissiveRouterUrl;
private String noClientAuthRouterUrl;
private String customCertCheckRouterUrl;
@ -45,7 +54,9 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
private String brokerTLSUrl;
private String historicalTLSUrl;
private String coordinatorTLSUrl;
private String indexerTLSUrl;
private String coordinatorTwoTLSUrl;
private String overlordTLSUrl;
private String overlordTwoTLSUrl;
private String permissiveRouterTLSUrl;
private String noClientAuthRouterTLSUrl;
private String customCertCheckRouterTLSUrl;
@ -79,17 +90,16 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
catch (IOException ex) {
throw new RuntimeException(ex);
}
routerHost = props.get("router_host");
// there might not be a router; we want routerHost to be null in that case
routerUrl = props.get("router_url");
if (routerUrl == null) {
String routerHost = props.get("router_host");
if (null != routerHost) {
routerUrl = StringUtils.format("http://%s:%s", routerHost, props.get("router_port"));
}
}
routerTLSUrl = props.get("router_tls_url");
if (routerTLSUrl == null) {
String routerHost = props.get("router_host");
if (null != routerHost) {
routerTLSUrl = StringUtils.format("https://%s:%s", routerHost, props.get("router_tls_port"));
}
@ -137,51 +147,74 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
}
}
brokerHost = props.get("broker_host");
brokerUrl = props.get("broker_url");
if (brokerUrl == null) {
brokerUrl = StringUtils.format("http://%s:%s", props.get("broker_host"), props.get("broker_port"));
}
brokerTLSUrl = props.get("broker_tls_url");
if (brokerTLSUrl == null) {
String brokerHost = props.get("broker_host");
if (null != brokerHost) {
brokerTLSUrl = StringUtils.format("https://%s:%s", brokerHost, props.get("broker_tls_port"));
}
}
historicalHost = props.get("historical_host");
historicalUrl = props.get("historical_url");
if (historicalUrl == null) {
historicalUrl = StringUtils.format("http://%s:%s", props.get("historical_host"), props.get("historical_port"));
}
historicalTLSUrl = props.get("historical_tls_url");
if (historicalTLSUrl == null) {
String historicalHost = props.get("historical_host");
if (null != historicalHost) {
historicalTLSUrl = StringUtils.format("https://%s:%s", historicalHost, props.get("historical_tls_port"));
}
}
coordinatorHost = props.get("coordinator_host");
coordinatorUrl = props.get("coordinator_url");
if (coordinatorUrl == null) {
coordinatorUrl = StringUtils.format("http://%s:%s", props.get("coordinator_host"), props.get("coordinator_port"));
coordinatorUrl = StringUtils.format("http://%s:%s", coordinatorHost, props.get("coordinator_port"));
}
coordinatorTLSUrl = props.get("coordinator_tls_url");
if (coordinatorTLSUrl == null) {
String coordinatorHost = props.get("coordinator_host");
if (null != coordinatorHost) {
coordinatorTLSUrl = StringUtils.format("https://%s:%s", coordinatorHost, props.get("coordinator_tls_port"));
}
}
indexerUrl = props.get("indexer_url");
if (indexerUrl == null) {
indexerUrl = StringUtils.format("http://%s:%s", props.get("indexer_host"), props.get("indexer_port"));
overlordHost = props.get("indexer_host");
overlordUrl = props.get("indexer_url");
if (overlordUrl == null) {
overlordUrl = StringUtils.format("http://%s:%s", overlordHost, props.get("indexer_port"));
}
indexerTLSUrl = props.get("indexer_tls_url");
if (indexerTLSUrl == null) {
String indexerHost = props.get("indexer_host");
if (null != indexerHost) {
indexerTLSUrl = StringUtils.format("https://%s:%s", indexerHost, props.get("indexer_tls_port"));
overlordTLSUrl = props.get("indexer_tls_url");
if (overlordTLSUrl == null) {
if (null != overlordHost) {
overlordTLSUrl = StringUtils.format("https://%s:%s", overlordHost, props.get("indexer_tls_port"));
}
}
coordinatorTwoHost = props.get("coordinator_two_host");
coordinatorTwoUrl = props.get("coordinator_two_url");
if (coordinatorTwoUrl == null) {
coordinatorTwoUrl = StringUtils.format("http://%s:%s", coordinatorTwoHost, props.get("coordinator_two_port"));
}
coordinatorTwoTLSUrl = props.get("coordinator_two_tls_url");
if (coordinatorTwoTLSUrl == null) {
if (null != coordinatorTwoHost) {
coordinatorTwoTLSUrl = StringUtils.format("https://%s:%s", coordinatorTwoHost, props.get("coordinator_two_tls_port"));
}
}
overlordTwoHost = props.get("overlord_two_host");
overlordTwoUrl = props.get("overlord_two_url");
if (overlordTwoUrl == null) {
overlordTwoUrl = StringUtils.format("http://%s:%s", overlordTwoHost, props.get("overlord_two_port"));
}
overlordTwoTLSUrl = props.get("overlord_two_tls_url");
if (overlordTwoTLSUrl == null) {
if (null != overlordTwoHost) {
overlordTwoTLSUrl = StringUtils.format("https://%s:%s", overlordTwoHost, props.get("overlord_two_tls_port"));
}
}
@ -205,7 +238,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
LOG.info("broker: [%s], [%s]", brokerUrl, brokerTLSUrl);
LOG.info("historical: [%s], [%s]", historicalUrl, historicalTLSUrl);
LOG.info("coordinator: [%s], [%s]", coordinatorUrl, coordinatorTLSUrl);
LOG.info("overlord: [%s], [%s]", indexerUrl, indexerTLSUrl);
LOG.info("overlord: [%s], [%s]", overlordUrl, overlordTLSUrl);
LOG.info("middle manager: [%s]", middleManagerHost);
LOG.info("zookeepers: [%s]", zookeeperHosts);
LOG.info("kafka: [%s]", kafkaHost);
@ -230,16 +263,54 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
return coordinatorTLSUrl;
}
@Override
public String getCoordinatorTwoUrl()
{
return coordinatorTwoUrl;
}
@Override
public String getCoordinatorTwoTLSUrl()
{
return coordinatorTwoTLSUrl;
}
@Override
public String getOverlordUrl()
{
return overlordUrl;
}
@Override
public String getOverlordTLSUrl()
{
return overlordTLSUrl;
}
@Override
public String getOverlordTwoUrl()
{
return overlordTwoUrl;
}
@Override
public String getOverlordTwoTLSUrl()
{
return overlordTwoTLSUrl;
}
@Override
public String getIndexerUrl()
{
return indexerUrl;
// no way to configure this since the config was stolen by the overlord
return null;
}
@Override
public String getIndexerTLSUrl()
{
return indexerTLSUrl;
// no way to configure this since the config was stolen by the overlord
return null;
}
@Override
@ -320,6 +391,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
return middleManagerHost;
}
@Override
public String getHistoricalHost()
{
return historicalHost;
}
@Override
public String getZookeeperHosts()
{
@ -332,6 +409,42 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
return kafkaHost;
}
@Override
public String getBrokerHost()
{
return brokerHost;
}
@Override
public String getRouterHost()
{
return routerHost;
}
@Override
public String getCoordinatorHost()
{
return coordinatorHost;
}
@Override
public String getCoordinatorTwoHost()
{
return coordinatorTwoHost;
}
@Override
public String getOverlordHost()
{
return overlordHost;
}
@Override
public String getOverlordTwoHost()
{
return overlordTwoHost;
}
@Override
public String getProperty(String keyword)
{

View File

@ -33,7 +33,6 @@ import java.util.Map;
public class DockerConfigProvider implements IntegrationTestingConfigProvider
{
@JsonProperty
@NotNull
private String dockerIp;
@ -81,17 +80,41 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
}
@Override
public String getIndexerUrl()
public String getCoordinatorTwoUrl()
{
return "http://" + dockerIp + ":8581";
}
@Override
public String getCoordinatorTwoTLSUrl()
{
return "https://" + dockerIp + ":8781";
}
@Override
public String getOverlordUrl()
{
return "http://" + dockerIp + ":8090";
}
@Override
public String getIndexerTLSUrl()
public String getOverlordTLSUrl()
{
return "https://" + dockerIp + ":8290";
}
@Override
public String getOverlordTwoUrl()
{
return "http://" + dockerIp + ":8590";
}
@Override
public String getOverlordTwoTLSUrl()
{
return "https://" + dockerIp + ":8790";
}
@Override
public String getRouterUrl()
{
@ -164,18 +187,78 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
return "https://" + dockerIp + ":8283";
}
@Override
public String getIndexerUrl()
{
return "http://" + dockerIp + ":8091";
}
@Override
public String getIndexerTLSUrl()
{
return "https://" + dockerIp + ":8291";
}
@Override
public String getMiddleManagerHost()
{
return dockerIp;
}
@Override
public String getHistoricalHost()
{
return dockerIp;
}
@Override
public String getBrokerHost()
{
return dockerIp;
}
@Override
public String getRouterHost()
{
return dockerIp;
}
@Override
public String getCoordinatorHost()
{
return dockerIp;
}
@Override
public String getCoordinatorTwoHost()
{
return dockerIp;
}
@Override
public String getOverlordHost()
{
return dockerIp;
}
@Override
public String getOverlordTwoHost()
{
return dockerIp;
}
@Override
public String getZookeeperHosts()
{
return dockerIp + ":2181";
}
@Override
public String getKafkaHost()
{
return dockerIp + ":9093";
}
@Override
public String getZookeeperInternalHosts()
{
@ -183,12 +266,6 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
return "druid-zookeeper-kafka:2181";
}
@Override
public String getKafkaHost()
{
return dockerIp + ":9093";
}
@Override
public String getKafkaInternalHost()
{
@ -196,6 +273,49 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
return "druid-zookeeper-kafka:9092";
}
@Override
public String getBrokerInternalHost()
{
return "druid-broker";
}
@Override
public String getRouterInternalHost()
{
return "druid-router";
}
@Override
public String getCoordinatorInternalHost()
{
return "druid-coordinator";
}
@Override
public String getCoordinatorTwoInternalHost()
{
return "druid-coordinator-two";
}
@Override
public String getOverlordInternalHost()
{
return "druid-overlord";
}
@Override
public String getOverlordTwoInternalHost()
{
return "druid-overlord-two";
}
@Override
public String getHistoricalInternalHost()
{
return "druid-historical";
}
@Override
public String getProperty(String prop)
{

View File

@ -25,10 +25,92 @@ import java.util.Map;
*/
public interface IntegrationTestingConfig
{
String getZookeeperHosts();
default String getZookeeperInternalHosts()
{
return getZookeeperHosts();
}
String getKafkaHost();
default String getKafkaInternalHost()
{
return getKafkaHost();
}
String getBrokerHost();
default String getBrokerInternalHost()
{
return getBrokerHost();
}
String getRouterHost();
default String getRouterInternalHost()
{
return getRouterHost();
}
String getCoordinatorHost();
default String getCoordinatorInternalHost()
{
return getCoordinatorHost();
}
String getCoordinatorTwoHost();
default String getCoordinatorTwoInternalHost()
{
return getCoordinatorTwoHost();
}
String getOverlordHost();
default String getOverlordInternalHost()
{
return getOverlordHost();
}
String getOverlordTwoHost();
default String getOverlordTwoInternalHost()
{
return getOverlordTwoHost();
}
String getMiddleManagerHost();
default String getMiddleManagerInternalHost()
{
return getMiddleManagerHost();
}
String getHistoricalHost();
default String getHistoricalInternalHost()
{
return getHistoricalHost();
}
String getCoordinatorUrl();
String getCoordinatorTLSUrl();
String getCoordinatorTwoUrl();
String getCoordinatorTwoTLSUrl();
String getOverlordUrl();
String getOverlordTLSUrl();
String getOverlordTwoUrl();
String getOverlordTwoTLSUrl();
String getIndexerUrl();
String getIndexerTLSUrl();
@ -57,22 +139,6 @@ public interface IntegrationTestingConfig
String getHistoricalTLSUrl();
String getMiddleManagerHost();
String getZookeeperHosts();
default String getZookeeperInternalHosts()
{
return getZookeeperHosts();
}
String getKafkaHost();
default String getKafkaInternalHost()
{
return getKafkaHost();
}
String getProperty(String prop);
String getUsername();

View File

@ -66,7 +66,7 @@ public class OverlordResourceTestClient
{
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.indexer = config.getIndexerUrl();
this.indexer = config.getOverlordUrl();
}
private String getIndexerURL()

View File

@ -47,8 +47,10 @@ public class DruidClusterAdminClient
{
private static final Logger LOG = new Logger(DruidClusterAdminClient.class);
private static final String COORDINATOR_DOCKER_CONTAINER_NAME = "/druid-coordinator";
private static final String COORDINATOR_TWO_DOCKER_CONTAINER_NAME = "/druid-coordinator-two";
private static final String HISTORICAL_DOCKER_CONTAINER_NAME = "/druid-historical";
private static final String OVERLORD_DOCKER_CONTAINER_NAME = "/druid-overlord";
private static final String OVERLORD_TWO_DOCKER_CONTAINER_NAME = "/druid-overlord-two";
private static final String BROKER_DOCKER_CONTAINER_NAME = "/druid-broker";
private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router";
private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager";
@ -74,6 +76,11 @@ public class DruidClusterAdminClient
restartDockerContainer(COORDINATOR_DOCKER_CONTAINER_NAME);
}
public void restartCoordinatorTwoContainer()
{
restartDockerContainer(COORDINATOR_TWO_DOCKER_CONTAINER_NAME);
}
public void restartHistoricalContainer()
{
restartDockerContainer(HISTORICAL_DOCKER_CONTAINER_NAME);
@ -84,6 +91,11 @@ public class DruidClusterAdminClient
restartDockerContainer(OVERLORD_DOCKER_CONTAINER_NAME);
}
public void restartOverlordTwoContainer()
{
restartDockerContainer(OVERLORD_TWO_DOCKER_CONTAINER_NAME);
}
public void restartBrokerContainer()
{
restartDockerContainer(BROKER_DOCKER_CONTAINER_NAME);
@ -102,7 +114,22 @@ public class DruidClusterAdminClient
public void waitUntilCoordinatorReady()
{
waitUntilInstanceReady(config.getCoordinatorUrl());
postDynamicConfig(CoordinatorDynamicConfig.builder().withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(1).build());
postDynamicConfig(CoordinatorDynamicConfig.builder()
.withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(1)
.build());
}
public void waitUntilCoordinatorTwoReady()
{
waitUntilInstanceReady(config.getCoordinatorTwoUrl());
postDynamicConfig(CoordinatorDynamicConfig.builder()
.withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(1)
.build());
}
public void waitUntilOverlordTwoReady()
{
waitUntilInstanceReady(config.getOverlordTwoUrl());
}
public void waitUntilHistoricalReady()
@ -112,7 +139,7 @@ public class DruidClusterAdminClient
public void waitUntilIndexerReady()
{
waitUntilInstanceReady(config.getIndexerUrl());
waitUntilInstanceReady(config.getOverlordUrl());
}
public void waitUntilBrokerReady()
@ -180,7 +207,8 @@ public class DruidClusterAdminClient
).get();
LOG.info("%s %s", response.getStatus(), response.getContent());
return response.getStatus().equals(HttpResponseStatus.OK);
// if coordinator is not leader then it will return 307 instead of 200
return response.getStatus().equals(HttpResponseStatus.OK) || response.getStatus().equals(HttpResponseStatus.TEMPORARY_REDIRECT);
}
catch (Throwable e) {
LOG.error(e, "");

View File

@ -30,9 +30,9 @@ public class ITRetryUtil
private static final Logger LOG = new Logger(ITRetryUtil.class);
public static final int DEFAULT_RETRY_COUNT = 150; // 5 minutes
public static final int DEFAULT_RETRY_COUNT = 120; // 10 minutes
public static final long DEFAULT_RETRY_SLEEP = TimeUnit.SECONDS.toMillis(2);
public static final long DEFAULT_RETRY_SLEEP = TimeUnit.SECONDS.toMillis(5);
public static void retryUntilTrue(Callable<Boolean> callable, String task)
{

View File

@ -56,7 +56,7 @@ public class ITTLSCertificateChecker implements TLSCertificateChecker
baseTrustManager.checkServerTrusted(chain, authType, engine);
// fail intentionally when trying to talk to the broker
if (chain[0].toString().contains("172.172.172.8")) {
if (chain[0].toString().contains("172.172.172.60")) {
throw new CertificateException("Custom check intentionally terminated request to broker.");
}
}

View File

@ -144,4 +144,6 @@ public class TestNGGroup
* Kinesis stream endpoint for a region must also be pass to mvn with -Ddruid.test.config.streamEndpoint=<ENDPOINT>
*/
public static final String KINESIS_DATA_FORMAT = "kinesis-data-format";
public static final String HIGH_AVAILABILTY = "high-availability";
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.leadership;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.DruidClusterAdminClient;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.net.URL;
@Test(groups = TestNGGroup.HIGH_AVAILABILTY)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITHighAvailabilityTest
{
private static final String SYSTEM_QUERIES_RESOURCE = "/queries/high_availability_sys.json";
private static final int NUM_LEADERSHIP_SWAPS = 3;
@Inject
private IntegrationTestingConfig config;
@Inject
private DruidClusterAdminClient druidClusterAdminClient;
@Inject
ServerDiscoveryFactory factory;
@Inject
CoordinatorResourceTestClient coordinatorClient;
@Inject
SqlTestQueryHelper queryHelper;
@Inject
ObjectMapper jsonMapper;
@Inject
@TestClient
HttpClient httpClient;
@Test
public void testLeadershipChanges() throws Exception
{
int runCount = 0;
String previousCoordinatorLeader = null;
String previousOverlordLeader = null;
// fetch current leaders, make sure queries work, then swap leaders and do it again
do {
String coordinatorLeader = getLeader("coordinator");
String overlordLeader = getLeader("indexer");
// we expect leadership swap to happen
Assert.assertNotEquals(previousCoordinatorLeader, coordinatorLeader);
Assert.assertNotEquals(previousOverlordLeader, overlordLeader);
previousCoordinatorLeader = coordinatorLeader;
previousOverlordLeader = overlordLeader;
String queries = fillTemplate(
config,
AbstractIndexerTest.getResourceAsString(SYSTEM_QUERIES_RESOURCE),
overlordLeader,
coordinatorLeader
);
queryHelper.testQueriesFromString(queries);
swapLeadersAndWait(coordinatorLeader, overlordLeader);
} while (runCount++ < NUM_LEADERSHIP_SWAPS);
}
private void swapLeadersAndWait(String coordinatorLeader, String overlordLeader)
{
Runnable waitUntilCoordinatorSupplier;
if (isCoordinatorOneLeader(config, coordinatorLeader)) {
druidClusterAdminClient.restartCoordinatorContainer();
waitUntilCoordinatorSupplier = () -> druidClusterAdminClient.waitUntilCoordinatorReady();
} else {
druidClusterAdminClient.restartCoordinatorTwoContainer();
waitUntilCoordinatorSupplier = () -> druidClusterAdminClient.waitUntilCoordinatorTwoReady();
}
Runnable waitUntilOverlordSupplier;
if (isOverlordOneLeader(config, overlordLeader)) {
druidClusterAdminClient.restartOverlordContainer();
waitUntilOverlordSupplier = () -> druidClusterAdminClient.waitUntilIndexerReady();
} else {
druidClusterAdminClient.restartOverlordTwoContainer();
waitUntilOverlordSupplier = () -> druidClusterAdminClient.waitUntilOverlordTwoReady();
}
waitUntilCoordinatorSupplier.run();
waitUntilOverlordSupplier.run();
}
private String getLeader(String service)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.GET,
new URL(StringUtils.format(
"%s/druid/%s/v1/leader",
config.getRouterUrl(),
service
))
),
StatusResponseHandler.getInstance()
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while fetching leader from[%s] status[%s] content[%s]",
config.getRouterUrl(),
response.getStatus(),
response.getContent()
);
}
return response.getContent();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private static String fillTemplate(IntegrationTestingConfig config, String template, String overlordLeader, String coordinatorLeader)
{
/*
{"host":"%%BROKER%%","server_type":"broker", "is_leader": %%NON_LEADER%%},
{"host":"%%COORDINATOR_ONE%%","server_type":"coordinator", "is_leader": %%COORDINATOR_ONE_LEADER%%},
{"host":"%%COORDINATOR_TWO%%","server_type":"coordinator", "is_leader": %%COORDINATOR_TWO_LEADER%%},
{"host":"%%OVERLORD_ONE%%","server_type":"overlord", "is_leader": %%OVERLORD_ONE_LEADER%%},
{"host":"%%OVERLORD_TWO%%","server_type":"overlord", "is_leader": %%OVERLORD_TWO_LEADER%%},
{"host":"%%ROUTER%%","server_type":"router", "is_leader": %%NON_LEADER%%}
*/
String working = template;
working = StringUtils.replace(working, "%%OVERLORD_ONE%%", config.getOverlordInternalHost());
working = StringUtils.replace(working, "%%OVERLORD_TWO%%", config.getOverlordTwoInternalHost());
working = StringUtils.replace(working, "%%COORDINATOR_ONE%%", config.getCoordinatorInternalHost());
working = StringUtils.replace(working, "%%COORDINATOR_TWO%%", config.getCoordinatorTwoInternalHost());
working = StringUtils.replace(working, "%%BROKER%%", config.getBrokerInternalHost());
working = StringUtils.replace(working, "%%ROUTER%%", config.getRouterInternalHost());
if (isOverlordOneLeader(config, overlordLeader)) {
working = StringUtils.replace(working, "%%OVERLORD_ONE_LEADER%%", "1");
working = StringUtils.replace(working, "%%OVERLORD_TWO_LEADER%%", "0");
} else {
working = StringUtils.replace(working, "%%OVERLORD_ONE_LEADER%%", "0");
working = StringUtils.replace(working, "%%OVERLORD_TWO_LEADER%%", "1");
}
if (isCoordinatorOneLeader(config, coordinatorLeader)) {
working = StringUtils.replace(working, "%%COORDINATOR_ONE_LEADER%%", "1");
working = StringUtils.replace(working, "%%COORDINATOR_TWO_LEADER%%", "0");
} else {
working = StringUtils.replace(working, "%%COORDINATOR_ONE_LEADER%%", "0");
working = StringUtils.replace(working, "%%COORDINATOR_TWO_LEADER%%", "1");
}
working = StringUtils.replace(working, "%%NON_LEADER%%", String.valueOf(NullHandling.defaultLongValue()));
return working;
}
private static boolean isCoordinatorOneLeader(IntegrationTestingConfig config, String coordinatorLeader)
{
return coordinatorLeader.contains(transformHost(config.getCoordinatorInternalHost()));
}
private static boolean isOverlordOneLeader(IntegrationTestingConfig config, String overlordLeader)
{
return overlordLeader.contains(transformHost(config.getOverlordInternalHost()));
}
/**
* host + ':' which should be enough to distinguish subsets, e.g. 'druid-coordinator:8081' from
* 'druid-coordinator-two:8081' for example
*/
private static String transformHost(String host)
{
return StringUtils.format("%s:", host);
}
}

View File

@ -24,6 +24,7 @@ import com.google.inject.Inject;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.testing.IntegrationTestingConfig;
@ -41,6 +42,7 @@ import org.testng.annotations.Test;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITBroadcastJoinQueryTest.class);
private static final String BROADCAST_JOIN_TASK = "/indexer/broadcast_join_index_task.json";
private static final String BROADCAST_JOIN_METADATA_QUERIES_RESOURCE = "/queries/broadcast_join_metadata_queries.json";
private static final String BROADCAST_JOIN_METADATA_QUERIES_AFTER_DROP_RESOURCE = "/queries/broadcast_join_after_drop_metadata_queries.json";
@ -110,6 +112,7 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
return true;
}
catch (Exception ex) {
LOG.error(ex, "SQL metadata not yet in expected state");
return false;
}
},
@ -139,6 +142,7 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
return true;
}
catch (Exception ex) {
LOG.error(ex, "SQL metadata not yet in expected state");
return false;
}
},

View File

@ -36,7 +36,7 @@ public class ITSystemTableQueryTest
{
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
private static final String TWITTER_DATA_SOURCE = "twitterstream";
private static final String SYSTEM_QUERIES_RESOURCE = "/queries/sys_segment_queries.json";
private static final String SYSTEM_QUERIES_RESOURCE = "/queries/sys_queries.json";
@Inject
CoordinatorResourceTestClient coordinatorClient;

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.calcite.avatica.AvaticaSqlException;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.annotations.Client;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
@ -47,6 +48,7 @@ import org.apache.druid.testing.utils.HttpUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;
@ -216,13 +218,19 @@ public class ITBasicAuthConfigurationTest
);
final List<Map<String, Object>> adminServerSegments = jsonMapper.readValue(
TestQueryHelper.class.getResourceAsStream(SYSTEM_SCHEMA_SERVER_SEGMENTS_RESULTS_RESOURCE),
fillSegementServersTemplate(
config,
AbstractIndexerTest.getResourceAsString(SYSTEM_SCHEMA_SERVER_SEGMENTS_RESULTS_RESOURCE)
),
SYS_SCHEMA_RESULTS_TYPE_REFERENCE
);
final List<Map<String, Object>> adminServers = getServersWithoutCurrentSize(
jsonMapper.readValue(
TestQueryHelper.class.getResourceAsStream(SYSTEM_SCHEMA_SERVERS_RESULTS_RESOURCE),
fillServersTemplate(
config,
AbstractIndexerTest.getResourceAsString(SYSTEM_SCHEMA_SERVERS_RESULTS_RESOURCE)
),
SYS_SCHEMA_RESULTS_TYPE_REFERENCE
)
);
@ -542,7 +550,7 @@ public class ITBasicAuthConfigurationTest
private void testOptionsRequests(HttpClient httpClient)
{
HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getCoordinatorUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getIndexerUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getOverlordUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getBrokerUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getHistoricalUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getRouterUrl() + "/status", null);
@ -601,7 +609,7 @@ public class ITBasicAuthConfigurationTest
private void checkNodeAccess(HttpClient httpClient)
{
HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getCoordinatorUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getIndexerUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getOverlordUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getBrokerUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getHistoricalUrl() + "/status", null);
HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getRouterUrl() + "/status", null);
@ -610,7 +618,7 @@ public class ITBasicAuthConfigurationTest
private void checkLoadStatus(HttpClient httpClient) throws Exception
{
checkLoadStatusSingle(httpClient, config.getCoordinatorUrl());
checkLoadStatusSingle(httpClient, config.getIndexerUrl());
checkLoadStatusSingle(httpClient, config.getOverlordUrl());
checkLoadStatusSingle(httpClient, config.getBrokerUrl());
checkLoadStatusSingle(httpClient, config.getHistoricalUrl());
checkLoadStatusSingle(httpClient, config.getRouterUrl());
@ -794,4 +802,18 @@ public class ITBasicAuthConfigurationTest
}
);
}
private static String fillSegementServersTemplate(IntegrationTestingConfig config, String template)
{
String json = StringUtils.replace(template, "%%HISTORICAL%%", config.getHistoricalInternalHost());
return json;
}
private static String fillServersTemplate(IntegrationTestingConfig config, String template)
{
String json = StringUtils.replace(template, "%%HISTORICAL%%", config.getHistoricalInternalHost());
json = StringUtils.replace(json, "%%BROKER%%", config.getBrokerInternalHost());
json = StringUtils.replace(json, "%%NON_LEADER%%", String.valueOf(NullHandling.defaultLongValue()));
return json;
}
}

View File

@ -96,7 +96,7 @@ public class ITTLSTest
httpClient
);
makeRequest(adminClient, HttpMethod.GET, config.getCoordinatorUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getIndexerUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getOverlordUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getBrokerUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getHistoricalUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getRouterUrl() + "/status", null);
@ -113,7 +113,7 @@ public class ITTLSTest
httpClient
);
makeRequest(adminClient, HttpMethod.GET, config.getCoordinatorTLSUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getIndexerTLSUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getOverlordTLSUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getBrokerTLSUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getHistoricalTLSUrl() + "/status", null);
makeRequest(adminClient, HttpMethod.GET, config.getRouterTLSUrl() + "/status", null);
@ -130,7 +130,7 @@ public class ITTLSTest
"intermediate_ca_client"
);
makeRequest(intermediateCertClient, HttpMethod.GET, config.getCoordinatorTLSUrl() + "/status", null);
makeRequest(intermediateCertClient, HttpMethod.GET, config.getIndexerTLSUrl() + "/status", null);
makeRequest(intermediateCertClient, HttpMethod.GET, config.getOverlordTLSUrl() + "/status", null);
makeRequest(intermediateCertClient, HttpMethod.GET, config.getBrokerTLSUrl() + "/status", null);
makeRequest(intermediateCertClient, HttpMethod.GET, config.getHistoricalTLSUrl() + "/status", null);
makeRequest(intermediateCertClient, HttpMethod.GET, config.getRouterTLSUrl() + "/status", null);
@ -144,7 +144,7 @@ public class ITTLSTest
LOG.info("---------Testing TLS resource access without a certificate---------");
HttpClient certlessClient = makeCertlessClient();
checkFailedAccessNoCert(certlessClient, HttpMethod.GET, config.getCoordinatorTLSUrl());
checkFailedAccessNoCert(certlessClient, HttpMethod.GET, config.getIndexerTLSUrl());
checkFailedAccessNoCert(certlessClient, HttpMethod.GET, config.getOverlordTLSUrl());
checkFailedAccessNoCert(certlessClient, HttpMethod.GET, config.getBrokerTLSUrl());
checkFailedAccessNoCert(certlessClient, HttpMethod.GET, config.getHistoricalTLSUrl());
checkFailedAccessNoCert(certlessClient, HttpMethod.GET, config.getRouterTLSUrl());
@ -161,7 +161,7 @@ public class ITTLSTest
"invalid_hostname_client"
);
checkFailedAccessWrongHostname(wrongHostnameClient, HttpMethod.GET, config.getCoordinatorTLSUrl());
checkFailedAccessWrongHostname(wrongHostnameClient, HttpMethod.GET, config.getIndexerTLSUrl());
checkFailedAccessWrongHostname(wrongHostnameClient, HttpMethod.GET, config.getOverlordTLSUrl());
checkFailedAccessWrongHostname(wrongHostnameClient, HttpMethod.GET, config.getBrokerTLSUrl());
checkFailedAccessWrongHostname(wrongHostnameClient, HttpMethod.GET, config.getHistoricalTLSUrl());
checkFailedAccessWrongHostname(wrongHostnameClient, HttpMethod.GET, config.getRouterTLSUrl());
@ -178,7 +178,7 @@ public class ITTLSTest
"druid_another_root"
);
checkFailedAccessWrongRoot(wrongRootClient, HttpMethod.GET, config.getCoordinatorTLSUrl());
checkFailedAccessWrongRoot(wrongRootClient, HttpMethod.GET, config.getIndexerTLSUrl());
checkFailedAccessWrongRoot(wrongRootClient, HttpMethod.GET, config.getOverlordTLSUrl());
checkFailedAccessWrongRoot(wrongRootClient, HttpMethod.GET, config.getBrokerTLSUrl());
checkFailedAccessWrongRoot(wrongRootClient, HttpMethod.GET, config.getHistoricalTLSUrl());
checkFailedAccessWrongRoot(wrongRootClient, HttpMethod.GET, config.getRouterTLSUrl());
@ -195,7 +195,7 @@ public class ITTLSTest
"revoked_druid"
);
checkFailedAccessRevoked(revokedClient, HttpMethod.GET, config.getCoordinatorTLSUrl());
checkFailedAccessRevoked(revokedClient, HttpMethod.GET, config.getIndexerTLSUrl());
checkFailedAccessRevoked(revokedClient, HttpMethod.GET, config.getOverlordTLSUrl());
checkFailedAccessRevoked(revokedClient, HttpMethod.GET, config.getBrokerTLSUrl());
checkFailedAccessRevoked(revokedClient, HttpMethod.GET, config.getHistoricalTLSUrl());
checkFailedAccessRevoked(revokedClient, HttpMethod.GET, config.getRouterTLSUrl());
@ -212,7 +212,7 @@ public class ITTLSTest
"expired_client"
);
checkFailedAccessExpired(expiredClient, HttpMethod.GET, config.getCoordinatorTLSUrl());
checkFailedAccessExpired(expiredClient, HttpMethod.GET, config.getIndexerTLSUrl());
checkFailedAccessExpired(expiredClient, HttpMethod.GET, config.getOverlordTLSUrl());
checkFailedAccessExpired(expiredClient, HttpMethod.GET, config.getBrokerTLSUrl());
checkFailedAccessExpired(expiredClient, HttpMethod.GET, config.getHistoricalTLSUrl());
checkFailedAccessExpired(expiredClient, HttpMethod.GET, config.getRouterTLSUrl());
@ -230,7 +230,7 @@ public class ITTLSTest
"invalid_ca_client"
);
checkFailedAccessNotCA(notCAClient, HttpMethod.GET, config.getCoordinatorTLSUrl());
checkFailedAccessNotCA(notCAClient, HttpMethod.GET, config.getIndexerTLSUrl());
checkFailedAccessNotCA(notCAClient, HttpMethod.GET, config.getOverlordTLSUrl());
checkFailedAccessNotCA(notCAClient, HttpMethod.GET, config.getBrokerTLSUrl());
checkFailedAccessNotCA(notCAClient, HttpMethod.GET, config.getHistoricalTLSUrl());
checkFailedAccessNotCA(notCAClient, HttpMethod.GET, config.getRouterTLSUrl());

View File

@ -0,0 +1,39 @@
[
{
"description": "query sys.servers to make sure all expected servers are available",
"query": {
"query": "SELECT host, server_type, is_leader FROM sys.servers ORDER BY host"
},
"expectedResults": [
{"host":"%%BROKER%%","server_type":"broker", "is_leader": %%NON_LEADER%%},
{"host":"%%COORDINATOR_ONE%%","server_type":"coordinator", "is_leader": %%COORDINATOR_ONE_LEADER%%},
{"host":"%%COORDINATOR_TWO%%","server_type":"coordinator", "is_leader": %%COORDINATOR_TWO_LEADER%%},
{"host":"%%OVERLORD_ONE%%","server_type":"overlord", "is_leader": %%OVERLORD_ONE_LEADER%%},
{"host":"%%OVERLORD_TWO%%","server_type":"overlord", "is_leader": %%OVERLORD_TWO_LEADER%%},
{"host":"%%ROUTER%%","server_type":"router", "is_leader": %%NON_LEADER%%}
]
},
{
"description": "query sys.segments which is fed via coordinator data",
"query": {
"query": "SELECT datasource, count(*) FROM sys.segments WHERE datasource='wikipedia_editstream' OR datasource='twitterstream' GROUP BY 1 "
},
"expectedResults": [
{
"datasource": "wikipedia_editstream",
"EXPR$1": 1
},
{
"datasource": "twitterstream",
"EXPR$1": 3
}
]
},
{
"description": "query sys.tasks which is fed via overlord",
"query": {
"query": "SELECT datasource, count(*) FROM sys.tasks WHERE datasource='wikipedia_editstream' OR datasource='twitterstream' GROUP BY 1 "
},
"expectedResults": []
}
]

View File

@ -1,6 +1,6 @@
[
{
"server": "172.172.172.6:8283",
"server": "%%HISTORICAL%%:8283",
"segment_id": "auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9"
}
]

View File

@ -1,22 +1,24 @@
[
{
"server": "172.172.172.6:8283",
"host": "172.172.172.6",
"server": "%%HISTORICAL%%:8283",
"host": "%%HISTORICAL%%",
"plaintext_port": 8083,
"tls_port": 8283,
"server_type": "historical",
"tier": "_default_tier",
"curr_size": 2208932412,
"max_size": 5000000000
"max_size": 5000000000,
"is_leader": %%NON_LEADER%%
},
{
"server": "172.172.172.9:8282",
"host": "172.172.172.9",
"server": "%%BROKER%%:8282",
"host": "%%BROKER%%",
"plaintext_port": 8082,
"tls_port": 8282,
"server_type": "broker",
"tier": "_default_tier",
"curr_size": 0,
"max_size": 1000000000
"max_size": 1000000000,
"is_leader": %%NON_LEADER%%
}
]

View File

@ -16,22 +16,17 @@
set -e
. $(dirname "$0")/script/docker_compose_args.sh
DOCKERDIR=$(dirname "$0")/docker
# Skip stopping docker if flag set (For use during development)
if [ -n "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" == true ]
then
exit 0
fi
then
exit 0
fi
for node in druid-historical druid-historical-for-query-retry-test druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-indexer druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
do
CONTAINER="$(docker ps -aq -f name=${node})"
if [ ! -z "$CONTAINER" ]
then
docker stop $node
docker rm $node
fi
done
docker-compose $(getComposeArgs) down
if [ ! -z "$(docker network ls -q -f name=druid-it-net)" ]
then

View File

@ -50,6 +50,7 @@ import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderClient;
@ -159,6 +160,7 @@ public class SystemSchema extends AbstractSchema
.add("tier", ValueType.STRING)
.add("curr_size", ValueType.LONG)
.add("max_size", ValueType.LONG)
.add("is_leader", ValueType.LONG)
.build();
static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature
@ -215,7 +217,7 @@ public class SystemSchema extends AbstractSchema
Preconditions.checkNotNull(serverView, "serverView");
this.tableMap = ImmutableMap.of(
SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, jsonMapper, authorizerMapper),
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper),
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper, overlordDruidLeaderClient, coordinatorDruidLeaderClient),
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper),
SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper)
@ -479,16 +481,22 @@ public class SystemSchema extends AbstractSchema
private final AuthorizerMapper authorizerMapper;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final InventoryView serverInventoryView;
private final DruidLeaderClient overlordLeaderClient;
private final DruidLeaderClient coordinatorLeaderClient;
public ServersTable(
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
InventoryView serverInventoryView,
AuthorizerMapper authorizerMapper
AuthorizerMapper authorizerMapper,
DruidLeaderClient overlordLeaderClient,
DruidLeaderClient coordinatorLeaderClient
)
{
this.authorizerMapper = authorizerMapper;
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.serverInventoryView = serverInventoryView;
this.overlordLeaderClient = overlordLeaderClient;
this.coordinatorLeaderClient = coordinatorLeaderClient;
}
@Override
@ -513,17 +521,30 @@ public class SystemSchema extends AbstractSchema
);
checkStateReadAccessForServers(authenticationResult, authorizerMapper);
String tmpCoordinatorLeader = "";
String tmpOverlordLeader = "";
try {
tmpCoordinatorLeader = coordinatorLeaderClient.findCurrentLeader();
tmpOverlordLeader = overlordLeaderClient.findCurrentLeader();
}
catch (ISE ignored) {
// no reason to kill the results if something is sad and there are no leaders
}
final String coordinatorLeader = tmpCoordinatorLeader;
final String overlordLeader = tmpOverlordLeader;
final FluentIterable<Object[]> results = FluentIterable
.from(() -> druidServers)
.transform((DiscoveryDruidNode discoveryDruidNode) -> {
//noinspection ConstantConditions
final boolean isDiscoverableDataServer = isDiscoverableDataServer(discoveryDruidNode);
final NodeRole serverRole = discoveryDruidNode.getNodeRole();
if (isDiscoverableDataServer) {
final DruidServer druidServer = serverInventoryView.getInventoryValue(
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
);
if (druidServer != null || discoveryDruidNode.getNodeRole().equals(NodeRole.HISTORICAL)) {
if (druidServer != null || NodeRole.HISTORICAL.equals(serverRole)) {
// Build a row for the data server if that server is in the server view, or the node type is historical.
// The historicals are usually supposed to be found in the server view. If some historicals are
// missing, it could mean that there are some problems in them to announce themselves. We just fill
@ -532,6 +553,16 @@ public class SystemSchema extends AbstractSchema
} else {
return buildRowForNonDataServer(discoveryDruidNode);
}
} else if (NodeRole.COORDINATOR.equals(serverRole)) {
return buildRowForNonDataServerWithLeadership(
discoveryDruidNode,
coordinatorLeader.contains(discoveryDruidNode.getDruidNode().getHostAndPortToUse())
);
} else if (NodeRole.OVERLORD.equals(serverRole)) {
return buildRowForNonDataServerWithLeadership(
discoveryDruidNode,
overlordLeader.contains(discoveryDruidNode.getDruidNode().getHostAndPortToUse())
);
} else {
return buildRowForNonDataServer(discoveryDruidNode);
}
@ -539,6 +570,7 @@ public class SystemSchema extends AbstractSchema
return Linq4j.asEnumerable(results);
}
/**
* Returns a row for all node types which don't serve data. The returned row contains only static information.
*/
@ -553,7 +585,27 @@ public class SystemSchema extends AbstractSchema
StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()),
null,
UNKNOWN_SIZE,
UNKNOWN_SIZE
UNKNOWN_SIZE,
NullHandling.defaultLongValue()
};
}
/**
* Returns a row for all node types which don't serve data. The returned row contains only static information.
*/
private static Object[] buildRowForNonDataServerWithLeadership(DiscoveryDruidNode discoveryDruidNode, boolean isLeader)
{
final DruidNode node = discoveryDruidNode.getDruidNode();
return new Object[]{
node.getHostAndPortToUse(),
node.getHost(),
(long) node.getPlaintextPort(),
(long) node.getTlsPort(),
StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()),
null,
UNKNOWN_SIZE,
UNKNOWN_SIZE,
isLeader ? 1L : 0L
};
}
@ -586,7 +638,8 @@ public class SystemSchema extends AbstractSchema
StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()),
druidServerToUse.getTier(),
currentSize,
druidServerToUse.getMaxSize()
druidServerToUse.getMaxSize(),
NullHandling.defaultLongValue()
};
}

View File

@ -40,6 +40,7 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.InventoryView;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
@ -145,6 +146,8 @@ public class SystemSchemaTest extends CalciteTestBase
private SystemSchema schema;
private SpecificSegmentsQuerySegmentWalker walker;
private DruidLeaderClient client;
private DruidLeaderClient coordinatorClient;
private DruidLeaderClient overlordClient;
private TimelineServerView serverView;
private ObjectMapper mapper;
private StringFullResponseHolder responseHolder;
@ -179,6 +182,8 @@ public class SystemSchemaTest extends CalciteTestBase
{
serverView = EasyMock.createNiceMock(TimelineServerView.class);
client = EasyMock.createMock(DruidLeaderClient.class);
coordinatorClient = EasyMock.createMock(DruidLeaderClient.class);
overlordClient = EasyMock.createMock(DruidLeaderClient.class);
mapper = TestHelper.makeJsonMapper();
responseHolder = EasyMock.createMock(StringFullResponseHolder.class);
responseHandler = EasyMock.createMockBuilder(BytesAccumulatingResponseHandler.class)
@ -378,12 +383,24 @@ public class SystemSchemaTest extends CalciteTestBase
ImmutableMap.of()
);
private final DiscoveryDruidNode coordinator2 = new DiscoveryDruidNode(
new DruidNode("s1", "localhost", false, 8181, null, true, false),
NodeRole.COORDINATOR,
ImmutableMap.of()
);
private final DiscoveryDruidNode overlord = new DiscoveryDruidNode(
new DruidNode("s2", "localhost", false, 8090, null, true, false),
NodeRole.OVERLORD,
ImmutableMap.of()
);
private final DiscoveryDruidNode overlord2 = new DiscoveryDruidNode(
new DruidNode("s2", "localhost", false, 8190, null, true, false),
NodeRole.OVERLORD,
ImmutableMap.of()
);
private final DiscoveryDruidNode broker1 = new DiscoveryDruidNode(
new DruidNode("s3", "localhost", false, 8082, null, true, false),
NodeRole.BROKER,
@ -516,7 +533,7 @@ public class SystemSchemaTest extends CalciteTestBase
final SystemSchema.ServersTable serversTable = (SystemSchema.ServersTable) schema.getTableMap().get("servers");
final RelDataType serverRowType = serversTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> serverFields = serverRowType.getFieldList();
Assert.assertEquals(8, serverFields.size());
Assert.assertEquals(9, serverFields.size());
Assert.assertEquals("server", serverFields.get(0).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, serverFields.get(0).getType().getSqlTypeName());
}
@ -736,7 +753,9 @@ public class SystemSchemaTest extends CalciteTestBase
.withConstructor(
druidNodeDiscoveryProvider,
serverInventoryView,
authMapper
authMapper,
overlordClient,
coordinatorClient
)
.createMock();
EasyMock.replay(serversTable);
@ -769,8 +788,8 @@ public class SystemSchemaTest extends CalciteTestBase
.once();
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(peonNodeDiscovery).once();
EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once();
EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord)).once();
EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator, coordinator2)).once();
EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord, overlord2)).once();
EasyMock.expect(brokerNodeDiscovery.getAllNodes())
.andReturn(ImmutableList.of(broker1, broker2, brokerWithBroadcastSegments))
.once();
@ -782,6 +801,9 @@ public class SystemSchemaTest extends CalciteTestBase
EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once();
EasyMock.expect(indexerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(indexer)).once();
EasyMock.expect(coordinatorClient.findCurrentLeader()).andReturn(coordinator.getDruidNode().getHostAndPortToUse()).once();
EasyMock.expect(overlordClient.findCurrentLeader()).andReturn(overlord.getDruidNode().getHostAndPortToUse()).once();
final List<DruidServer> servers = new ArrayList<>();
servers.add(mockDataServer(historical1.getDruidNode().getHostAndPortToUse(), 200L, 1000L, "tier"));
servers.add(mockDataServer(historical2.getDruidNode().getHostAndPortToUse(), 400L, 1000L, "tier"));
@ -795,7 +817,7 @@ public class SystemSchemaTest extends CalciteTestBase
.andReturn(null)
.once();
EasyMock.replay(druidNodeDiscoveryProvider, serverInventoryView);
EasyMock.replay(druidNodeDiscoveryProvider, serverInventoryView, coordinatorClient, overlordClient);
EasyMock.replay(servers.toArray(new Object[0]));
EasyMock.replay(
coordinatorNodeDiscovery,
@ -839,6 +861,7 @@ public class SystemSchemaTest extends CalciteTestBase
rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0]));
final List<Object[]> expectedRows = new ArrayList<>();
final Long nonLeader = NullHandling.defaultLongValue();
expectedRows.add(
createExpectedRow(
"brokerHost:8082",
@ -848,7 +871,8 @@ public class SystemSchemaTest extends CalciteTestBase
NodeRole.BROKER,
null,
0L,
0L
0L,
nonLeader
)
);
expectedRows.add(
@ -860,19 +884,60 @@ public class SystemSchemaTest extends CalciteTestBase
NodeRole.BROKER,
"tier",
0L,
1000L
1000L,
nonLeader
)
);
expectedRows.add(
createExpectedRow("histHost:8083", "histHost", 8083, -1, NodeRole.HISTORICAL, "tier", 400L, 1000L)
createExpectedRow(
"histHost:8083",
"histHost",
8083,
-1,
NodeRole.HISTORICAL,
"tier",
400L,
1000L,
nonLeader
)
);
expectedRows.add(
createExpectedRow("indexerHost:8091", "indexerHost", 8091, -1, NodeRole.INDEXER, "tier", 0L, 1000L)
createExpectedRow(
"indexerHost:8091",
"indexerHost",
8091,
-1,
NodeRole.INDEXER,
"tier",
0L,
1000L,
nonLeader
)
);
expectedRows.add(
createExpectedRow("lameHost:8083", "lameHost", 8083, -1, NodeRole.HISTORICAL, "tier", 0L, 1000L)
createExpectedRow(
"lameHost:8083",
"lameHost",
8083,
-1,
NodeRole.HISTORICAL,
"tier",
0L,
1000L,
nonLeader
)
);
expectedRows.add(createExpectedRow("localhost:8080", "localhost", 8080, -1, NodeRole.PEON, "tier", 0L, 1000L));
expectedRows.add(createExpectedRow(
"localhost:8080",
"localhost",
8080,
-1,
NodeRole.PEON,
"tier",
0L,
1000L,
nonLeader
));
expectedRows.add(
createExpectedRow(
"localhost:8081",
@ -882,7 +947,8 @@ public class SystemSchemaTest extends CalciteTestBase
NodeRole.COORDINATOR,
null,
0L,
0L
0L,
1L
)
);
expectedRows.add(
@ -894,11 +960,22 @@ public class SystemSchemaTest extends CalciteTestBase
NodeRole.BROKER,
null,
0L,
0L
0L,
nonLeader
)
);
expectedRows.add(
createExpectedRow("localhost:8083", "localhost", 8083, -1, NodeRole.HISTORICAL, "tier", 200L, 1000L)
createExpectedRow(
"localhost:8083",
"localhost",
8083,
-1,
NodeRole.HISTORICAL,
"tier",
200L,
1000L,
nonLeader
)
);
expectedRows.add(
createExpectedRow(
@ -909,6 +986,33 @@ public class SystemSchemaTest extends CalciteTestBase
NodeRole.OVERLORD,
null,
0L,
0L,
1L
)
);
expectedRows.add(
createExpectedRow(
"localhost:8181",
"localhost",
8181,
-1,
NodeRole.COORDINATOR,
null,
0L,
0L,
0L
)
);
expectedRows.add(
createExpectedRow(
"localhost:8190",
"localhost",
8190,
-1,
NodeRole.OVERLORD,
null,
0L,
0L,
0L
)
);
@ -921,7 +1025,8 @@ public class SystemSchemaTest extends CalciteTestBase
NodeRole.ROUTER,
null,
0L,
0L
0L,
nonLeader
)
);
expectedRows.add(
@ -933,10 +1038,21 @@ public class SystemSchemaTest extends CalciteTestBase
NodeRole.MIDDLE_MANAGER,
null,
0L,
0L
0L,
nonLeader
)
);
expectedRows.add(createExpectedRow("peonHost:8080", "peonHost", 8080, -1, NodeRole.PEON, "tier", 0L, 1000L));
expectedRows.add(createExpectedRow(
"peonHost:8080",
"peonHost",
8080,
-1,
NodeRole.PEON,
"tier",
0L,
1000L,
nonLeader
));
Assert.assertEquals(expectedRows.size(), rows.size());
for (int i = 0; i < rows.size(); i++) {
Assert.assertArrayEquals(expectedRows.get(i), rows.get(i));
@ -966,7 +1082,8 @@ public class SystemSchemaTest extends CalciteTestBase
NodeRole nodeRole,
@Nullable String tier,
@Nullable Long currSize,
@Nullable Long maxSize
@Nullable Long maxSize,
@Nullable Long isLeader
)
{
return new Object[]{
@ -977,7 +1094,8 @@ public class SystemSchemaTest extends CalciteTestBase
StringUtils.toLowerCase(nodeRole.toString()),
tier,
currSize,
maxSize
maxSize,
isLeader
};
}

View File

@ -998,19 +998,45 @@ public class CalciteTests
)
{
final DruidNode coordinatorNode = new DruidNode("test", "dummy", false, 8080, null, true, false);
final DruidNode coordinatorNode = new DruidNode("test-coordinator", "dummy", false, 8081, null, true, false);
FakeDruidNodeDiscoveryProvider provider = new FakeDruidNodeDiscoveryProvider(
ImmutableMap.of(
NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDINATOR, coordinatorNode))
)
);
final DruidNode overlordNode = new DruidNode("test-overlord", "dummy", false, 8090, null, true, false);
FakeDruidNodeDiscoveryProvider overlordProvider = new FakeDruidNodeDiscoveryProvider(
ImmutableMap.of(
NodeRole.OVERLORD, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.OVERLORD, coordinatorNode))
)
);
final DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
new FakeHttpClient(),
provider,
NodeRole.COORDINATOR,
"/simple/leader"
);
) {
@Override
public String findCurrentLeader()
{
return coordinatorNode.getHostAndPortToUse();
}
};
final DruidLeaderClient overlordLeaderClient = new DruidLeaderClient(
new FakeHttpClient(),
overlordProvider,
NodeRole.OVERLORD,
"/simple/leader"
) {
@Override
public String findCurrentLeader()
{
return overlordNode.getHostAndPortToUse();
}
};
return new SystemSchema(
druidSchema,
@ -1024,7 +1050,7 @@ public class CalciteTests
new FakeServerInventoryView(),
authorizerMapper,
druidLeaderClient,
druidLeaderClient,
overlordLeaderClient,
provider,
getJsonMapper()
);

View File

@ -1514,6 +1514,7 @@ exprs
group_id
interval_expr
is_available
is_leader
is_overshadowed
is_published
is_realtime