Fixes and tests related to the Indexer process. (#10631)

* Fixes and tests related to the Indexer process.

Three bugs fixed:

1) Indexers would not announce themselves as segment servers if they
   did not have storage locations defined. This used to work, but was
   broken in #9971. Fixed this by adding an "isSegmentServer" method
   to ServerType and updating SegmentLoadDropHandler to always announce
   if this method returns true.

2) Certain batch task types were written in a way that assumed "isReady"
   would be called before "run", which is not guaranteed. In particular,
   they relied on it in order to initialize "taskLockHelper". Fixed this
   by updating AbstractBatchIndexTask to ensure "isReady" is called
   before "run" for these tasks.

3) UnifiedIndexerAppenderatorsManager did not properly handle complex
   datasources. Introduced DataSourceAnalysis in order to fix this.

Test changes:

1) Add a new "docker-compose.cli-indexer.yml" config that spins up an
   Indexer instead of a MiddleManager.

2) Introduce a "USE_INDEXER" environment variable that determines if
   docker-compose will start up an Indexer or a MiddleManager.

3) Duplicate all the jdk8 tests and run them in both MiddleManager and
   Indexer mode.

4) Various adjustments to encourage fail-fast errors in the Docker
   build scripts.

5) Various adjustments to speed up integration tests and reduce memory
   usage.

6) Add another Mac-specific approach to determining a machine's own IP.
   This was useful on my development machine.

7) Update segment-count check in ITCompactionTaskTest to eliminate a
   race condition (it was looking for 6 segments, which only exist
   together briefly, until the older 4 are marked unused).

Javadoc updates:

1) AbstractBatchIndexTask: Added javadocs to determineLockGranularityXXX
   that make it clear when taskLockHelper will be initialized as a side
   effect. (Related to the second bug above.)

2) Task: Clarified that "isReady" is not guaranteed to be called before
   "run". It was already implied, but now it's explicit.

3) ZkCoordinator: Clarified deprecation message.

4) DataSegmentServerAnnouncer: Clarified deprecation message.

* Fix stop_cluster script.

* Fix sanity check in script.

* Fix hashbang lines.

* Test and doc adjustments.

* Additional tests, and adjustments for tests.

* Split ITs back out.

* Revert change to druid_coordinator_period_indexingPeriod.

* Set Indexer capacity to match MM.

* Bump up Historical memory.

* Bump down coordinator, overlord memory.

* Bump up Broker memory.
This commit is contained in:
Gian Merlino 2020-12-08 16:02:26 -08:00 committed by GitHub
parent 5324785eac
commit 96a387d972
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 692 additions and 147 deletions

View File

@ -307,6 +307,7 @@ jobs:
# Integration tests Java Compile version is set by the machine environment jdk (set by the jdk key)
# Integration tests Java Runtime version is set by the JVM_RUNTIME env property (set env key to -Djvm.runtime=<JVM_RUNTIME_VERSION>)
# Integration tests will either use MiddleManagers or Indexers
# (Currently integration tests only support running with jvm runtime 8 and 11)
# START - Integration tests for Compile with Java 8 and Run with Java 8
- &integration_batch_index
@ -314,9 +315,9 @@ jobs:
jdk: openjdk8
services: &integration_test_services
- docker
env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: &run_integration_test
- ${MVN} verify -pl integration-tests -P integration-tests ${TESTNG_GROUPS} ${JVM_RUNTIME} ${MAVEN_SKIP}
- ${MVN} verify -pl integration-tests -P integration-tests ${TESTNG_GROUPS} ${JVM_RUNTIME} -Dit.indexer=${USE_INDEXER} ${MAVEN_SKIP}
after_failure: &integration_test_diags
- for v in ~/shared/logs/*.log ; do
echo $v logtail ======================== ; tail -100 $v ;
@ -326,51 +327,75 @@ jobs:
docker exec -it druid-$v sh -c 'dmesg | tail -3' ;
done
- <<: *integration_batch_index
name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer"
env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_input_format
name: "(Compile=openjdk8, Run=openjdk8) input format integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_input_format
name: "(Compile=openjdk8, Run=openjdk8) input format integration test with Indexer"
env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_input_source
name: "(Compile=openjdk8, Run=openjdk8) input source integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_input_source
name: "(Compile=openjdk8, Run=openjdk8) input source integration test with Indexer"
env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_perfect_rollup_parallel_batch_index
name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_perfect_rollup_parallel_batch_index
name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test with Indexer"
env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_kafka_index
name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_kafka_index
name: "(Compile=openjdk8, Run=openjdk8) kafka index, transactional kafka index integration test with Indexer"
env: TESTNG_GROUPS='-Dgroups=kafka-index,kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_kafka_index_slow
name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test slow"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_kafka_index_slow
name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test slow with Indexer"
env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_kafka_transactional_index
name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
@ -378,23 +403,31 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test slow"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_kafka_transactional_index_slow
name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test slow with Indexer"
env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_kafka_format_tests
name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags
name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_kafka_format_tests
name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats with Indexer"
env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_query
name: "(Compile=openjdk8, Run=openjdk8) query integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
@ -402,7 +435,7 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) query retry integration test for missing segments"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
@ -410,7 +443,7 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) security integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=security' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=security' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
@ -418,7 +451,7 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) realtime index integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
@ -426,82 +459,94 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) append ingestion integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_append_ingestion
name: "(Compile=openjdk8, Run=openjdk8) append ingestion integration test with Indexer"
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_compaction_tests
name: "(Compile=openjdk8, Run=openjdk8) compaction integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_compaction_tests
name: "(Compile=openjdk8, Run=openjdk8) compaction integration test with Indexer"
env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
name: "(Compile=openjdk8, Run=openjdk8) other integration tests"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
# END - Integration tests for Compile with Java 8 and Run with Java 8
# START - Integration tests for Compile with Java 8 and Run with Java 11
- <<: *integration_batch_index
name: "(Compile=openjdk8, Run=openjdk11) batch index integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_input_format
name: "(Compile=openjdk8, Run=openjdk11) input format integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_input_source
name: "(Compile=openjdk8, Run=openjdk11) input source integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_perfect_rollup_parallel_batch_index
name: "(Compile=openjdk8, Run=openjdk11) perfect rollup parallel batch index integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_query
name: "(Compile=openjdk8, Run=openjdk11) query integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_query_retry
name: "(Compile=openjdk8, Run=openjdk11) query retry integration test for missing segments"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_security
name: "(Compile=openjdk8, Run=openjdk11) security integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=security' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=security' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_realtime_index
name: "(Compile=openjdk8, Run=openjdk11) realtime index integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_append_ingestion
name: "(Compile=openjdk8, Run=openjdk11) append ingestion integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_compaction_tests
name: "(Compile=openjdk8, Run=openjdk11) compaction integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
# END - Integration tests for Compile with Java 8 and Run with Java 11
- name: "security vulnerabilities"

View File

@ -89,6 +89,8 @@ By default, the number of concurrent persist/merge operations is limited to (`dr
Separate task logs are not currently supported when using the Indexer; all task log messages will instead be logged in the Indexer process log.
The Indexer currently imposes an identical memory limit on each task. In later releases, the per-task memory limit will be removed and only the global limit will apply. The limit on concurrent merges will also be removed.
The Indexer currently imposes an identical memory limit on each task. In later releases, the per-task memory limit will be removed and only the global limit will apply. The limit on concurrent merges will also be removed.
In later releases, per-task memory usage will be dynamically managed. Please see https://github.com/apache/druid/issues/7900 for details on future enhancements to the Indexer.
The Indexer does not work properly with [`index_realtime`](../ingestion/tasks.md#index_realtime) task types. Therefore, it is not compatible with [Tranquility](../ingestion/tranquility.md). If you are using Tranquility, consider migrating to Druid's builtin [Apache Kafka](../development/extensions-core/kafka-ingestion.md) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md) ingestion options.
In later releases, per-task memory usage will be dynamically managed. Please see https://github.com/apache/druid/issues/7900 for details on future enhancements to the Indexer.

View File

@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
@ -82,7 +83,7 @@ import java.util.stream.Collectors;
/**
* Abstract class for batch tasks like {@link IndexTask}.
* Provides some methods such as {@link #determineSegmentGranularity}, {@link #findInputSegments},
* and {@link #determineLockGranularityandTryLock} for easily acquiring task locks.
* and {@link #determineLockGranularityAndTryLock} for easily acquiring task locks.
*/
public abstract class AbstractBatchIndexTask extends AbstractTask
{
@ -122,6 +123,17 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
if (taskLockHelper == null) {
// Subclasses generally use "isReady" to initialize the taskLockHelper. It's not guaranteed to be called before
// "run", and so we call it here to ensure it happens.
//
// We're only really calling it for its side effects, and we expect it to return "true". If it doesn't, something
// strange is going on, so bail out.
if (!isReady(toolbox.getTaskActionClient())) {
throw new ISE("Cannot start; not ready!");
}
}
synchronized (this) {
if (stopped) {
return TaskStatus.failure(getId());
@ -251,9 +263,17 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
}
/**
* Determine lockGranularity to use and try to acquire necessary locks.
* This method respects the value of 'forceTimeChunkLock' in task context.
* If it's set to false or missing, this method checks if this task can use segmentLock.
* Attempts to acquire a lock that covers the intervals specified in a certain granularitySpec.
*
* This method uses {@link GranularitySpec#bucketIntervals()} to get the list of intervals to lock, and passes them
* to {@link #determineLockGranularityAndTryLock(TaskActionClient, List)}.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
* will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
protected boolean determineLockGranularityAndTryLock(
TaskActionClient client,
@ -263,10 +283,20 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
final List<Interval> intervals = granularitySpec.bucketIntervals().isPresent()
? new ArrayList<>(granularitySpec.bucketIntervals().get())
: Collections.emptyList();
return determineLockGranularityandTryLock(client, intervals);
return determineLockGranularityAndTryLock(client, intervals);
}
boolean determineLockGranularityandTryLock(TaskActionClient client, List<Interval> intervals) throws IOException
/**
* Attempts to acquire a lock that covers certain intervals.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
* will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
@ -287,12 +317,22 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
return tryLockWithDetermineResult(client, result);
} else {
// This branch is the only one that will not initialize taskLockHelper.
return true;
}
}
}
boolean determineLockGranularityandTryLockWithSegments(
/**
* Attempts to acquire a lock that covers certain segments.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* This method will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
boolean determineLockGranularityAndTryLockWithSegments(
TaskActionClient client,
List<DataSegment> segments,
BiConsumer<LockGranularity, List<DataSegment>> segmentCheckFunction
@ -396,7 +436,8 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
if (granularityFromSegments == null
|| segmentGranularityFromSpec != null
&& (!granularityFromSegments.equals(segmentGranularityFromSpec)
|| segments.stream().anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
|| segments.stream()
.anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
// This case is one of the followings:
// 1) Segments have different granularities.
// 2) Segment granularity in ingestion spec is different from the one of existig segments.

View File

@ -312,7 +312,7 @@ public class CompactionTask extends AbstractBatchIndexTask
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
final List<DataSegment> segments = segmentProvider.findSegments(taskActionClient);
return determineLockGranularityandTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
return determineLockGranularityAndTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
}
@Override

View File

@ -476,7 +476,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
final List<Interval> allocateIntervals = new ArrayList<>(partitionAnalysis.getAllIntervalsToIndex());
final DataSchema dataSchema;
if (determineIntervals) {
if (!determineLockGranularityandTryLock(toolbox.getTaskActionClient(), allocateIntervals)) {
if (!determineLockGranularityAndTryLock(toolbox.getTaskActionClient(), allocateIntervals)) {
throw new ISE("Failed to get locks for intervals[%s]", allocateIntervals);
}

View File

@ -160,8 +160,11 @@ public interface Task
/**
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
* <p/>
* Overlord. If this method throws an exception, the task should be considered a failure.
*
* This method will not necessarily be executed before {@link #run(TaskToolbox)}, since this task readiness check
* may be done on a different machine from the one that actually runs the task.
*
* This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)

View File

@ -51,7 +51,7 @@ import java.util.stream.Collectors;
* - {@link #verifyAndLockExistingSegments} is to verify the granularity of existing segments and lock them.
* This method must be called before the task starts indexing.
* - Tells the task what {@link LockGranularity} it should use. Note that the LockGranularity is determined in
* {@link AbstractBatchIndexTask#determineLockGranularityandTryLock}.
* {@link AbstractBatchIndexTask#determineLockGranularityAndTryLock}.
* - Provides some util methods for {@link LockGranularity#SEGMENT}. Also caches the information of locked segments when
* - the SEGMENt lock granularity is used.
*/

View File

@ -37,7 +37,7 @@ Integration Testing Using Docker
Before starting, if you don't already have docker on your machine, install it as described on
[Docker installation instructions](https://docs.docker.com/install/). Ensure that you
have at least 4GB of memory allocated to the docker engine. (You can verify it
under Preferences > Advanced.)
under Preferences > Resources > Advanced.)
Also set the `DOCKER_IP`
environment variable to localhost on your system, as follows:
@ -72,31 +72,40 @@ Druid routers for security group integration test (permissive tls, no client aut
## Docker compose
Docker compose yamls located in "docker" folder
There are a few different Docker compose yamls located in "docker" folder. Before you can run any of these, you must
build the Docker images. See "Manually bringing up Docker containers and running tests" below.
docker-compose.base.yml - Base file that defines all containers for integration test
docker-compose.yml - Defines Druid cluster with default configuration that is used for running integration tests in Travis CI.
docker-compose -f docker-compose.yml up
// DRUID_INTEGRATION_TEST_GROUP - this variable is used in Druid docker container for "security" and "query" test group. Use next docker-compose if you want to run security/query tests.
DRUID_INTEGRATION_TEST_GROUP=security docker-compose -f docker-compose.yml up
```
docker-compose -f docker-compose.yml up
// DRUID_INTEGRATION_TEST_GROUP - this variable is used in Druid docker container for "security" and "query" test group. Use next docker-compose if you want to run security/query tests.
DRUID_INTEGRATION_TEST_GROUP=security docker-compose -f docker-compose.yml up
```
docker-compose.override-env.yml - Defines Druid cluster with default configuration plus any additional and/or overriden configurations from override-env file.
// OVERRIDE_ENV - variable that must contains path to Druid configuration file
OVERRIDE_ENV=./environment-configs/override-examples/s3 docker-compose -f docker-compose.override-env.yml up
```
// OVERRIDE_ENV - variable that must contains path to Druid configuration file
OVERRIDE_ENV=./environment-configs/override-examples/s3 docker-compose -f docker-compose.override-env.yml up
```
docker-compose.security.yml - Defines three additional Druid router services with permissive tls, no client auth tls, and custom check tls respectively.
This is meant to be use together with docker-compose.yml or docker-compose.override-env.yml and is only needed for the "security" group integration test.
docker-compose -f docker-compose.yml -f docker-compose.security.yml up
```
docker-compose -f docker-compose.yml -f docker-compose.security.yml up
```
docker-compose.druid-hadoop.yml - for starting Apache Hadoop 2.8.5 cluster with the same setup as the Druid tutorial
docker-compose -f docker-compose.druid-hadoop.yml up
```
docker-compose -f docker-compose.druid-hadoop.yml up
```
## Manual bringing up docker containers and running tests
## Manually bringing up Docker containers and running tests
1. Build druid-cluster, druid-hadoop docker images. From root module run maven command:
```
@ -106,30 +115,38 @@ mvn clean install -pl integration-tests -P integration-tests -Ddocker.run.skip=t
2. Run druid cluster by docker-compose:
```
- Basic Druid cluster (skip this if running Druid cluster with override configs):
# Basic Druid cluster (skip this if running Druid cluster with override configs):
docker-compose -f integration-tests/docker/docker-compose.yml up
- Druid cluster with override configs (skip this if running Basic Druid cluster):
# Druid cluster with override configs (skip this if running Basic Druid cluster):
OVERRIDE_ENV=<PATH_TO_ENV> docker-compose -f ${DOCKERDIR}/docker-compose.override-env.yml up
- Druid hadoop (if needed):
# Druid hadoop (if needed):
docker-compose -f ${DOCKERDIR}/docker-compose.druid-hadoop.yml up
- Druid routers for security group integration test (if needed):
docker-compose -f ${DOCKERDIR}/docker-compose.security.yml up
# Druid routers for security group integration test (if needed):
docker-compose -f ${DOCKERDIR}/docker-compose.security.yml up
```
3. Run maven command to execute tests with -Ddocker.build.skip=true -Ddocker.run.skip=true
For example:
```
mvn verify -P integration-tests -pl integration-tests -Dit.test=ITIndexerTest -Ddocker.build.skip=true -Ddocker.run.skip=true
```
## Tips & tricks for debugging and developing integration tests
### Useful mvn command flags
- -Ddocker.build.skip=true to skip build druid containers.
If you do not apply any change to druid then you can do not rebuild druid.
This can save ~4 minutes to build druid cluster and druid hadoop.
- -Ddocker.build.skip=true to skip building the containers.
If you do not apply any change to Druid then you skip rebuilding the containers. This can save ~4 minutes.
You need to build druid containers only once, after you can skip docker build step.
- -Ddocker.run.skip=true to skip starting docker containers. This can save ~3 minutes by skipping building and bringing
up the docker containers (Druid, Kafka, Hadoop, MYSQL, zookeeper, etc). Please make sure that you actually do have
these containers already running if using this flag. Additionally, please make sure that the running containers
are in the same state that the setup script (run_cluster.sh) would have brought it up in.
are in the same state that the setup script (run_cluster.sh) would have brought it up in.
### Debugging Druid while running tests

View File

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
echo $DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH
export DIR=$(cd $(dirname $0) && pwd)

View File

@ -94,11 +94,12 @@ EXPOSE 8300 8301 8302 8303 8304 8305
EXPOSE 9092 9093
WORKDIR /var/lib/druid
ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \
ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \
&& . /druid.sh \
# Create druid service config files with all the config variables
&& . /druid.sh; setupConfig \
&& setupConfig \
# Some test groups require pre-existing data to be setup
&& . /druid.sh; setupData \
&& setupData \
# Export the service config file path to use in supervisord conf file
&& export DRUID_COMMON_CONF_DIR="$(. /druid.sh; getConfPath ${DRUID_SERVICE})" \
# Export the common config file path to use in supervisord conf file

View File

@ -45,7 +45,7 @@ services:
- 51111:51111
networks:
druid-it-net:
ipv4_address: 172.172.172.13
ipv4_address: 172.172.172.15
privileged: true
volumes:
- ${HOME}/shared:/shared
@ -173,12 +173,31 @@ services:
- ./environment-configs/common
- ./environment-configs/middlemanager
druid-indexer:
image: druid/cluster
container_name: druid-indexer
networks:
druid-it-net:
ipv4_address: 172.172.172.8
ports:
- 5008:5008
- 8091:8091
- 8291:8291
privileged: true
volumes:
- ./../src/test/resources:/resources
- ${HOME}/shared:/shared
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
env_file:
- ./environment-configs/common
- ./environment-configs/indexer
druid-broker:
image: druid/cluster
container_name: druid-broker
networks:
druid-it-net:
ipv4_address: 172.172.172.8
ipv4_address: 172.172.172.9
ports:
- 5005:5005
- 8082:8082
@ -196,7 +215,7 @@ services:
container_name: druid-router
networks:
druid-it-net:
ipv4_address: 172.172.172.9
ipv4_address: 172.172.172.10
ports:
- 5004:5004
- 8888:8888
@ -214,7 +233,7 @@ services:
container_name: druid-router-permissive-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.10
ipv4_address: 172.172.172.11
ports:
- 5001:5001
- 8889:8889
@ -232,7 +251,7 @@ services:
container_name: druid-router-no-client-auth-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.11
ipv4_address: 172.172.172.12
ports:
- 5002:5002
- 8890:8890
@ -250,7 +269,7 @@ services:
container_name: druid-router-custom-check-tls
networks:
druid-it-net:
ipv4_address: 172.172.172.12
ipv4_address: 172.172.172.13
ports:
- 5003:5003
- 8891:8891

View File

@ -0,0 +1,119 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "2.2"
services:
druid-zookeeper-kafka:
extends:
file: docker-compose.base.yml
service: druid-zookeeper-kafka
druid-metadata-storage:
extends:
file: docker-compose.base.yml
service: druid-metadata-storage
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-zookeeper-kafka
druid-overlord:
extends:
file: docker-compose.base.yml
service: druid-overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
druid-coordinator:
extends:
file: docker-compose.base.yml
service: druid-coordinator
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-overlord:druid-overlord
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-overlord
- druid-metadata-storage
- druid-zookeeper-kafka
druid-historical:
extends:
file: docker-compose.base.yml
service: druid-historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-zookeeper-kafka
druid-indexer:
extends:
file: docker-compose.base.yml
service: druid-indexer
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-overlord:druid-overlord
depends_on:
- druid-zookeeper-kafka
- druid-overlord
druid-broker:
extends:
file: docker-compose.base.yml
service: druid-broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-indexer:druid-indexer
- druid-historical:druid-historical
depends_on:
- druid-zookeeper-kafka
- druid-indexer
- druid-historical
druid-router:
extends:
file: docker-compose.base.yml
service: druid-router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
depends_on:
- druid-zookeeper-kafka
- druid-coordinator
- druid-broker
networks:
druid-it-net:
name: druid-it-net
ipam:
config:
- subnet: 172.172.172.0/24

View File

@ -101,7 +101,7 @@ services:
container_name: druid-historical-for-query-retry-test
networks:
druid-it-net:
ipv4_address: 172.172.172.13
ipv4_address: 172.172.172.14
ports:
- 8084:8083
- 8284:8283

View File

@ -25,6 +25,7 @@ getConfPath()
historical) echo $cluster_conf_base/data/historical ;;
historical-for-query-retry-test) echo $cluster_conf_base/data/historical ;;
middleManager) echo $cluster_conf_base/data/middleManager ;;
indexer) echo $cluster_conf_base/data/indexer ;;
coordinator) echo $cluster_conf_base/master/coordinator ;;
broker) echo $cluster_conf_base/query/broker ;;
router) echo $cluster_conf_base/query/router ;;

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=broker
DRUID_LOG_PATH=/shared/logs/broker.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx256m -Xms256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
SERVICE_DRUID_JAVA_OPTS=-server -Xmx192m -Xms192m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
# Druid configs
druid_processing_buffer_sizeBytes=25000000

View File

@ -22,7 +22,7 @@ LANGUAGE=C.UTF-8
LC_ALL=C.UTF-8
# JAVA OPTS
COMMON_DRUID_JAVA_OPTS=-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml
COMMON_DRUID_JAVA_OPTS=-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml -XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp
DRUID_DEP_LIB_DIR=/shared/hadoop_xml:/shared/docker/lib/*:/usr/local/druid/lib/mysql-connector-java.jar
# Druid configs

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=coordinator
DRUID_LOG_PATH=/shared/logs/coordinator.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006
# Druid configs
druid_metadata_storage_type=mysql

View File

@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
DRUID_SERVICE=indexer
DRUID_LOG_PATH=/shared/logs/indexer.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
# Druid configs
druid_server_http_numThreads=4
druid_storage_storageDirectory=/shared/storage
druid_processing_buffer_sizeBytes=25000000
druid_processing_numThreads=1
druid_selectors_indexing_serviceName=druid/overlord
druid_indexer_task_chathandler_type=announce
druid_auth_basic_common_cacheDirectory=/tmp/authCache/indexer
druid_startup_logging_logProperties=true
druid_server_https_crlPath=/tls/revocations.crl
druid_worker_capacity=10

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=overlord
DRUID_LOG_PATH=/shared/logs/overlord.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009
# Druid configs
druid_metadata_storage_type=mysql

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=router
DRUID_LOG_PATH=/shared/logs/router.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5004
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5004
# Druid configs
druid_auth_basic_common_cacheDirectory=/tmp/authCache/router

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=router
DRUID_LOG_PATH=/shared/logs/router-custom-check-tls.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5003
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5003
# Druid configs
druid_plaintextPort=8891

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=router
DRUID_LOG_PATH=/shared/logs/router-no-client-auth-tls.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5002
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5002
# Druid configs
druid_plaintextPort=8890

View File

@ -21,7 +21,7 @@ DRUID_SERVICE=router
DRUID_LOG_PATH=/shared/logs/router-permissive-tls.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5001
SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5001
# Druid configs
druid_plaintextPort=8889

View File

@ -16,11 +16,17 @@
# limitations under the License.
DOCKER_HOST_IP="$(host "$(hostname)" | perl -nle '/has address (.*)/ && print $1')"
if [ -z "$DOCKER_HOST_IP" ]; then
# Mac specific way to get host ip
DOCKER_HOST_IP="$(dscacheutil -q host -a name "$(HOSTNAME)" | perl -nle '/ip_address: (.*)/ && print $1' | tail -n1)"
fi
if [ -z "$DOCKER_HOST_IP" ]; then
# Another Mac specific way, when the machine isn't able to resolve its own name
DOCKER_HOST_IP="$(ifconfig | fgrep 'inet ' | fgrep -v 127.0.0.1 | awk '{print $2}')"
fi
if [ -z "$DOCKER_HOST_IP" ]; then
>&2 echo "Could not set docker host IP - integration tests can not run"
exit 1

View File

@ -370,6 +370,7 @@
<start.hadoop.docker>false</start.hadoop.docker>
<docker.run.skip>false</docker.run.skip>
<docker.build.skip>false</docker.build.skip>
<it.indexer>middleManager</it.indexer>
<override.config.path />
<resource.file.dir.path />
@ -397,6 +398,7 @@
<DRUID_INTEGRATION_TEST_RESOURCE_FILE_DIR_PATH>${resource.file.dir.path}</DRUID_INTEGRATION_TEST_RESOURCE_FILE_DIR_PATH>
<DRUID_INTEGRATION_TEST_SKIP_BUILD_DOCKER>${docker.build.skip}</DRUID_INTEGRATION_TEST_SKIP_BUILD_DOCKER>
<DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>
<DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER>
</environmentVariables>
<executable>${project.basedir}/build_run_cluster.sh</executable>
</configuration>

View File

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# wait for hadoop namenode to be up
echo "Waiting for hadoop namenode to be up"
docker exec -t druid-it-hadoop sh -c "./usr/local/hadoop/bin/hdfs dfs -mkdir -p /druid"
@ -41,4 +43,4 @@ echo "Finished setting up druid hadoop dirs"
echo "Copying Hadoop XML files to shared"
docker exec -t druid-it-hadoop sh -c "cp /usr/local/hadoop/etc/hadoop/*.xml /shared/hadoop_xml"
echo "Copied Hadoop XML files to shared"
echo "Copied Hadoop XML files to shared"

View File

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# setup client keystore
./docker/tls/generate-client-certs-and-keystores.sh
rm -rf docker/client_tls

View File

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# Build Druid Cluster Image
set -e

View File

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# Create docker network
{
docker network create --subnet=172.172.172.0/24 druid-it-net
@ -45,21 +47,40 @@ fi
docker-compose -f ${DOCKERDIR}/docker-compose.druid-hadoop.yml up -d
fi
# Start Druid services
if [ -z "$DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH" ]
then
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
then
# Start default Druid services and additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls)
docker-compose -f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.security.yml up -d
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]
then
# Start default Druid services with an additional historical modified for query retry test
# See CliHistoricalForQueryRetryTest.
docker-compose -f ${DOCKERDIR}/docker-compose.query-retry-test.yml up -d
else
# Start default Druid services
docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d
fi
# Sanity check: DRUID_INTEGRATION_TEST_INDEXER must be "indexer" or "middleManager"
if [ "$DRUID_INTEGRATION_TEST_INDEXER" != "indexer" ] && [ "$DRUID_INTEGRATION_TEST_INDEXER" != "middleManager" ]
then
echo "DRUID_INTEGRATION_TEST_INDEXER must be 'indexer' or 'middleManager' (is '$DRUID_INTEGRATION_TEST_INDEXER')"
exit 1
fi
if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ]
then
# Sanity check: cannot combine CliIndexer tests with security, query-retry tests
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]
then
echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer"
exit 1
fi
# Replace MiddleManager with Indexer
docker-compose -f ${DOCKERDIR}/docker-compose.cli-indexer.yml up -d
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
then
# Start default Druid services and additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls)
docker-compose -f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.security.yml up -d
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]
then
# Start default Druid services with an additional historical modified for query retry test
# See CliHistoricalForQueryRetryTest.
docker-compose -f ${DOCKERDIR}/docker-compose.query-retry-test.yml up -d
else
# Start default Druid services
docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d
fi
else
# run druid cluster with override config
OVERRIDE_ENV=$DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH docker-compose -f ${DOCKERDIR}/docker-compose.override-env.yml up -d

View File

@ -223,10 +223,10 @@ public class OverlordResourceTestClient
public void waitUntilTaskCompletes(final String taskID)
{
waitUntilTaskCompletes(taskID, 10000, 60);
waitUntilTaskCompletes(taskID, ITRetryUtil.DEFAULT_RETRY_SLEEP, ITRetryUtil.DEFAULT_RETRY_COUNT);
}
public void waitUntilTaskCompletes(final String taskID, final int millisEach, final int numTimes)
public void waitUntilTaskCompletes(final String taskID, final long millisEach, final int numTimes)
{
ITRetryUtil.retryUntil(
new Callable<Boolean>()
@ -254,7 +254,7 @@ public class OverlordResourceTestClient
}
public void waitUntilTaskFails(final String taskID, final int millisEach, final int numTimes)
public void waitUntilTaskFails(final String taskID, final long millisEach, final int numTimes)
{
ITRetryUtil.retryUntil(
new Callable<Boolean>()

View File

@ -48,8 +48,8 @@ public class DruidClusterAdminClient
private static final Logger LOG = new Logger(DruidClusterAdminClient.class);
private static final String COORDINATOR_DOCKER_CONTAINER_NAME = "/druid-coordinator";
private static final String HISTORICAL_DOCKER_CONTAINER_NAME = "/druid-historical";
private static final String INDEXER_DOCKER_CONTAINER_NAME = "/druid-overlord";
private static final String BROKERR_DOCKER_CONTAINER_NAME = "/druid-broker";
private static final String OVERLORD_DOCKER_CONTAINER_NAME = "/druid-overlord";
private static final String BROKER_DOCKER_CONTAINER_NAME = "/druid-broker";
private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router";
private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager";
@ -79,14 +79,14 @@ public class DruidClusterAdminClient
restartDockerContainer(HISTORICAL_DOCKER_CONTAINER_NAME);
}
public void restartIndexerContainer()
public void restartOverlordContainer()
{
restartDockerContainer(INDEXER_DOCKER_CONTAINER_NAME);
restartDockerContainer(OVERLORD_DOCKER_CONTAINER_NAME);
}
public void restartBrokerContainer()
{
restartDockerContainer(BROKERR_DOCKER_CONTAINER_NAME);
restartDockerContainer(BROKER_DOCKER_CONTAINER_NAME);
}
public void restartRouterContainer()

View File

@ -30,9 +30,9 @@ public class ITRetryUtil
private static final Logger LOG = new Logger(ITRetryUtil.class);
public static final int DEFAULT_RETRY_COUNT = 30;
public static final int DEFAULT_RETRY_COUNT = 150; // 5 minutes
public static final long DEFAULT_RETRY_SLEEP = TimeUnit.SECONDS.toMillis(10);
public static final long DEFAULT_RETRY_SLEEP = TimeUnit.SECONDS.toMillis(2);
public static void retryUntilTrue(Callable<Boolean> callable, String task)
{

View File

@ -54,6 +54,8 @@ public class TestNGGroup
public static final String QUERY_RETRY = "query-retry";
public static final String CLI_INDEXER = "cli-indexer";
public static final String REALTIME_INDEX = "realtime-index";
/**

View File

@ -215,7 +215,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
void doTestIndexDataWithLosingOverlord(@Nullable Boolean transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartIndexerContainer(),
() -> druidClusterAdminClient.restartOverlordContainer(),
() -> druidClusterAdminClient.waitUntilIndexerReady(),
transactionEnabled
);

View File

@ -11,7 +11,7 @@
},
{
"query": {
"query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL"
"query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL AND server_type <> 'indexer'"
},
"expectedResults": [
{

View File

@ -16,7 +16,7 @@
},
{
"query": {
"query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL"
"query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL AND server_type <> 'indexer'"
},
"expectedResults": [
{

View File

@ -10,8 +10,8 @@
"max_size": 5000000000
},
{
"server": "172.172.172.8:8282",
"host": "172.172.172.8",
"server": "172.172.172.9:8282",
"host": "172.172.172.9",
"plaintext_port": 8082,
"tls_port": 8282,
"server_type": "broker",

View File

@ -14,16 +14,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# Skip stopping docker if flag set (For use during development)
if [ -n "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" == true ]
then
exit 0
fi
for node in druid-historical druid-historical-for-query-retry-test druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
for node in druid-historical druid-historical-for-query-retry-test druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-indexer druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
do
docker stop $node
docker rm $node
CONTAINER="$(docker ps -aq -f name=${node})"
if [ ! -z "$CONTAINER" ]
then
docker stop $node
docker rm $node
fi
done
docker network rm druid-it-net
if [ ! -z "$(docker network ls -q -f name=druid-it-net)" ]
then
docker network rm druid-it-net
fi

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.client.CachingQueryRunner;
@ -295,6 +296,12 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
);
}
@VisibleForTesting
String getDataSource()
{
return dataSource;
}
/**
* Decorates a Sink's query runner to emit query/segmentAndCache/time, query/segment/time, query/wait/time once
* each for the whole Sink. Also adds CPU time to cpuTimeAccumulator.

View File

@ -34,6 +34,7 @@ import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
@ -42,7 +43,9 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexSpec;
@ -258,14 +261,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
Iterable<Interval> intervals
)
{
DatasourceBundle datasourceBundle;
synchronized (this) {
datasourceBundle = datasourceBundles.get(query.getDataSource().toString());
if (datasourceBundle == null) {
throw new IAE("Could not find segment walker for datasource [%s]", query.getDataSource().toString());
}
}
return datasourceBundle.getWalker().getQueryRunnerForIntervals(query, intervals);
return getBundle(query).getWalker().getQueryRunnerForIntervals(query, intervals);
}
@Override
@ -274,14 +270,29 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
Iterable<SegmentDescriptor> specs
)
{
DatasourceBundle datasourceBundle;
return getBundle(query).getWalker().getQueryRunnerForSegments(query, specs);
}
@VisibleForTesting
<T> DatasourceBundle getBundle(final Query<T> query)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final TableDataSource table =
analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
final DatasourceBundle bundle;
synchronized (this) {
datasourceBundle = datasourceBundles.get(query.getDataSource().toString());
if (datasourceBundle == null) {
throw new IAE("Could not find segment walker for datasource [%s]", query.getDataSource().toString());
}
bundle = datasourceBundles.get(table.getName());
}
return datasourceBundle.getWalker().getQueryRunnerForSegments(query, specs);
if (bundle == null) {
throw new IAE("Could not find segment walker for datasource [%s]", table.getName());
}
return bundle;
}
@Override

View File

@ -20,7 +20,11 @@
package org.apache.druid.server.coordination;
/**
* Use announcement made by {@link org.apache.druid.discovery.DruidNodeAnnouncer}
* We are gradually migrating usages of this to {@link org.apache.druid.discovery.DruidNodeAnnouncer}.
*
* However, it's still required in some cases. As of this writing (2020-12-03) it's required for any process that
* is serving queryable segments via Curator-based segment discovery. (When using Curator for segment discovery, Brokers
* look for these announcements as part of discovering what segments are available.)
*/
@Deprecated
public interface DataSegmentServerAnnouncer

View File

@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
@ManageLifecycle
public class SegmentLoadDropHandler implements DataSegmentChangeHandler
@ -85,6 +86,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ServerTypeConfig serverTypeConfig;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
private volatile boolean started = false;
@ -139,8 +141,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
this.announcer = announcer;
this.serverAnnouncer = serverAnnouncer;
this.segmentManager = segmentManager;
this.exec = exec;
this.serverTypeConfig = serverTypeConfig;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
}
@ -157,6 +160,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
try {
if (!config.getLocations().isEmpty()) {
loadLocalCache();
}
if (shouldAnnounce()) {
serverAnnouncer.announce();
}
}
@ -179,7 +185,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
log.info("Stopping...");
try {
if (!config.getLocations().isEmpty()) {
if (shouldAnnounce()) {
serverAnnouncer.unannounce();
}
}
@ -258,7 +264,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
*
* @throws SegmentLoadingException if it fails to load the given segment
*/
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy) throws SegmentLoadingException
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy)
throws SegmentLoadingException
{
final boolean loaded;
try {
@ -566,6 +573,21 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
}
/**
* Returns whether or not we should announce ourselves as a data server using {@link DataSegmentServerAnnouncer}.
*
* Returns true if _either_:
*
* (1) Our {@link #serverTypeConfig} indicates we are a segment server. This is necessary for Brokers to be able
* to detect that we exist.
* (2) We have non-empty storage locations in {@link #config}. This is necessary for Coordinators to be able to
* assign segments to us.
*/
private boolean shouldAnnounce()
{
return serverTypeConfig.getServerType().isSegmentServer() || !config.getLocations().isEmpty();
}
private static class BackgroundSegmentAnnouncer implements AutoCloseable
{
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);

View File

@ -47,14 +47,46 @@ import org.apache.druid.java.util.common.StringUtils;
*/
public enum ServerType
{
HISTORICAL,
BRIDGE,
HISTORICAL {
@Override
public boolean isSegmentReplicationTarget()
{
return true;
}
@Override
public boolean isSegmentServer()
{
return true;
}
},
BRIDGE {
@Override
public boolean isSegmentReplicationTarget()
{
return true;
}
@Override
public boolean isSegmentServer()
{
return true;
}
},
INDEXER_EXECUTOR {
@Override
public boolean isSegmentReplicationTarget()
{
return false;
}
@Override
public boolean isSegmentServer()
{
return true;
}
},
REALTIME {
@ -63,6 +95,12 @@ public enum ServerType
{
return false;
}
@Override
public boolean isSegmentServer()
{
return true;
}
},
BROKER {
@ -71,6 +109,12 @@ public enum ServerType
{
return false;
}
@Override
public boolean isSegmentServer()
{
return false;
}
};
/**
@ -80,10 +124,7 @@ public enum ServerType
*
* @see org.apache.druid.server.coordinator.rules.LoadRule
*/
public boolean isSegmentReplicationTarget()
{
return true;
}
public abstract boolean isSegmentReplicationTarget();
/**
* Indicates this type of node is able to be a target of segment broadcast.
@ -95,6 +136,13 @@ public enum ServerType
return true;
}
/**
* Indicates this type of node is serving segments that are meant to be the target of fan-out by a Broker.
*
* Nodes that return "true" here are often referred to as "data servers" or "data server processes".
*/
public abstract boolean isSegmentServer();
@JsonCreator
public static ServerType fromString(String type)
{

View File

@ -38,7 +38,10 @@ import java.io.IOException;
import java.util.concurrent.ExecutorService;
/**
* Use {@link org.apache.druid.server.coordinator.HttpLoadQueuePeon} for segment load/drops.
* We are gradually migrating to {@link org.apache.druid.server.http.SegmentListerResource} for driving segment
* loads/drops on data server processes.
*
* However, this class is still the default mechanism as of this writing (2020-12-03).
*/
@Deprecated
public class ZkCoordinator

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.appenderator;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.NoopDataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
public class UnifiedIndexerAppenderatorsManagerTest
{
@Rule
public final ExpectedException expectedException = ExpectedException.none();
private final UnifiedIndexerAppenderatorsManager manager = new UnifiedIndexerAppenderatorsManager(
Execs.directExecutor(),
NoopJoinableFactory.INSTANCE,
new WorkerConfig(),
MapCache.create(10),
new CacheConfig(),
new CachePopulatorStats(),
TestHelper.makeJsonMapper(),
new NoopServiceEmitter(),
() -> new DefaultQueryRunnerFactoryConglomerate(ImmutableMap.of())
);
private final Appenderator appenderator = manager.createOfflineAppenderatorForTask(
"taskId",
new DataSchema(
"myDataSource",
new TimestampSpec("__time", "millis", null),
null,
null,
new UniformGranularitySpec(Granularities.HOUR, Granularities.HOUR, false, Collections.emptyList()),
null
),
EasyMock.createMock(AppenderatorConfig.class),
new FireDepartmentMetrics(),
new NoopDataSegmentPusher(),
TestHelper.makeJsonMapper(),
TestHelper.getTestIndexIO(),
TestHelper.getTestIndexMergerV9(OnHeapMemorySegmentWriteOutMediumFactory.instance()),
new NoopRowIngestionMeters(),
new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0)
);
@Test
public void test_getBundle_knownDataSource()
{
final UnifiedIndexerAppenderatorsManager.DatasourceBundle bundle = manager.getBundle(
Druids.newScanQueryBuilder()
.dataSource(appenderator.getDataSource())
.intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
.build()
);
Assert.assertEquals("myDataSource", bundle.getWalker().getDataSource());
}
@Test
public void test_getBundle_unknownDataSource()
{
final ScanQuery query = Druids.newScanQueryBuilder()
.dataSource("unknown")
.intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
.build();
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Could not find segment walker for datasource");
manager.getBundle(query);
}
}