From 16f5ae440510951b247920d41833da98804897d0 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn <52679095+maytasm@users.noreply.github.com> Date: Wed, 22 Apr 2020 07:43:34 -1000 Subject: [PATCH] Add integration tests for kafka ingestion (#9724) * add kafka admin and kafka writer * refactor kinesis IT * fix typo refactor * parallel * parallel * parallel * parallel works now * add kafka it * add doc to readme * fix tests * fix failing test * test * test * test * test * address comments * addressed comments --- .travis.yml | 33 +- integration-tests/README.md | 23 +- integration-tests/pom.xml | 38 +- integration-tests/run_cluster.sh | 6 + .../clients/OverlordResourceTestClient.java | 10 + .../testing/guice/DruidTestModuleFactory.java | 1 - .../druid/testing/utils/KafkaAdminClient.java | 108 ++++ .../druid/testing/utils/KafkaEventWriter.java | 123 +++++ .../testing/utils/KinesisAdminClient.java | 24 +- .../testing/utils/KinesisEventWriter.java | 18 + .../testing/utils/StreamAdminClient.java | 44 ++ .../testing/utils/StreamEventWriter.java | 13 +- .../druid/testing/utils/StreamGenerator.java | 4 +- .../druid/testing/utils/SuiteListener.java | 65 +++ .../utils/SyntheticStreamGenerator.java | 14 +- .../org/testng/DruidTestRunnerFactory.java | 109 ---- .../org/apache/druid/tests/TestNGGroup.java | 6 + .../duty/ITAutoCompactionTest.java | 3 + .../indexer/AbstractKafkaIndexerTest.java | 317 ------------ .../AbstractKafkaIndexingServiceTest.java | 159 ++++++ .../AbstractKinesisIndexingServiceTest.java | 150 ++++++ .../indexer/AbstractStreamIndexingTest.java | 450 ++++++++++++++++ ...ServiceNonTransactionalSerializedTest.java | 83 +++ .../indexer/ITKafkaIndexingServiceTest.java | 63 --- ...ingServiceTransactionalSerializedTest.java | 83 +++ ...KafkaIndexingServiceTransactionalTest.java | 66 --- ...TKinesisIndexingServiceSerializedTest.java | 77 +++ .../indexer/ITKinesisIndexingServiceTest.java | 480 ------------------ ...rviceNonTransactionalParallelizedTest.java | 97 ++++ ...gServiceTransactionalParallelizedTest.java | 97 ++++ ...inesisIndexingServiceParallelizedTest.java | 101 ++++ .../src/test/resources/testng.xml | 27 +- integration-tests/stop_cluster.sh | 6 + pom.xml | 3 +- 34 files changed, 1803 insertions(+), 1098 deletions(-) create mode 100644 integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java create mode 100644 integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java create mode 100644 integration-tests/src/main/java/org/apache/druid/testing/utils/StreamAdminClient.java create mode 100644 integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java delete mode 100644 integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java delete mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java delete mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java delete mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java delete mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java diff --git a/.travis.yml b/.travis.yml index 45caa749a7a..93ddf47e5be 100644 --- a/.travis.yml +++ b/.travis.yml @@ -313,6 +313,30 @@ jobs: script: *run_integration_test after_failure: *integration_test_diags + - &integration_kafka_index_slow + name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test slow" + jdk: openjdk8 + services: *integration_test_services + env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8' + script: *run_integration_test + after_failure: *integration_test_diags + + - &integration_kafka_transactional_index + name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test" + jdk: openjdk8 + services: *integration_test_services + env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8' + script: *run_integration_test + after_failure: *integration_test_diags + + - &integration_kafka_transactional_index_slow + name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test slow" + jdk: openjdk8 + services: *integration_test_services + env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8' + script: *run_integration_test + after_failure: *integration_test_diags + - &integration_query name: "(Compile=openjdk8, Run=openjdk8) query integration test" jdk: openjdk8 @@ -341,7 +365,7 @@ jobs: name: "(Compile=openjdk8, Run=openjdk8) other integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8' script: *run_integration_test after_failure: *integration_test_diags # END - Integration tests for Compile with Java 8 and Run with Java 8 @@ -357,11 +381,6 @@ jobs: jdk: openjdk8 env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=11' - - <<: *integration_kafka_index - name: "(Compile=openjdk8, Run=openjdk11) kafka index integration test" - jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=11' - - <<: *integration_query name: "(Compile=openjdk8, Run=openjdk11) query integration test" jdk: openjdk8 @@ -380,7 +399,7 @@ jobs: - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) other integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=11' # END - Integration tests for Compile with Java 8 and Run with Java 11 - name: "security vulnerabilities" diff --git a/integration-tests/README.md b/integration-tests/README.md index baadd3a7bd1..f390e858680 100644 --- a/integration-tests/README.md +++ b/integration-tests/README.md @@ -68,7 +68,18 @@ can either be 8 or 11. Druid's configuration (using Docker) can be overrided by providing -Doverride.config.path=. The file must contain one property per line, the key must start with `druid_` and the format should be snake case. -## Debugging Druid while running tests +## Tips & tricks for debugging and developing integration tests + +### Useful mvn command flags + +- -Dskip.start.docker=true to skip starting docker containers. This can save ~3 minutes by skipping building and bringing +up the docker containers (Druid, Kafka, Hadoop, MYSQL, zookeeper, etc). Please make sure that you actually do have +these containers already running if using this flag. Additionally, please make sure that the running containers +are in the same state that the setup script (run_cluster.sh) would have brought it up in. +- -Dskip.stop.docker=true to skip stopping and teardowning down the docker containers. This can be useful in further +debugging after the integration tests have finish running. + +### Debugging Druid while running tests For your convenience, Druid processes running inside Docker have debugging enabled and the following ports have been made available to attach your remote debugger (such as via IntelliJ IDEA's Remote Configuration): @@ -303,3 +314,13 @@ This will tell the test framework that the test class needs to be constructed us 2) FromFileTestQueryHelper - reads queries with expected results from file and executes them and verifies the results using ResultVerifier Refer ITIndexerTest as an example on how to use dependency Injection + +### Running test methods in parallel +By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test +class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding +the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests" +test tag section in integration-tests/src/test/resources/testng.xml +Please be mindful when adding tests to the "AllParallelizedTests" test tag that the tests can run in parallel with +other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers, +test does not use excessive memory starving other concurent task, test does not modify and/or use other task, +supervisor, datasource it did not create. diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 885419623d3..9c4c429f738 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -233,11 +233,6 @@ com.google.guava guava - - com.101tec - zkclient - 0.10 - javax.validation validation-api @@ -304,21 +299,6 @@ easymock test - - org.apache.kafka - kafka_2.12 - ${apache.kafka.version} - - - log4j - log4j - - - slf4j-log4j12 - org.slf4j - - - @@ -367,6 +347,8 @@ integration-tests false + false + false @@ -385,6 +367,7 @@ ${start.hadoop.docker} + ${skip.start.docker} ${jvm.runtime} ${groups} ${override.config.path} @@ -400,6 +383,9 @@ post-integration-test + + ${skip.stop.docker} + ${project.basedir}/stop_cluster.sh @@ -419,12 +405,6 @@ - - - testrunfactory - org.testng.DruidTestRunnerFactory - - -Duser.timezone=UTC -Dfile.encoding=UTF-8 @@ -477,12 +457,6 @@ - - - testrunfactory - org.testng.DruidTestRunnerFactory - - -Duser.timezone=UTC -Dfile.encoding=UTF-8 diff --git a/integration-tests/run_cluster.sh b/integration-tests/run_cluster.sh index 122cf6a4d80..ba07e36d8f7 100755 --- a/integration-tests/run_cluster.sh +++ b/integration-tests/run_cluster.sh @@ -14,6 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Skip starting docker if flag set (For use during development) +if [ -n "$DRUID_INTEGRATION_TEST_SKIP_START_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_START_DOCKER" == true ] + then + exit 0 + fi + # Cleanup old images/containers { for node in druid-historical druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop; diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 20f6a76540d..7995595bb2b 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -43,6 +43,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.net.URL; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -156,6 +157,15 @@ public class OverlordResourceTestClient return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource))); } + public List getUncompletedTasksForDataSource(final String dataSource) + { + List uncompletedTasks = new ArrayList<>(); + uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=pending&datasource=%s", StringUtils.urlEncode(dataSource)))); + uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=running&datasource=%s", StringUtils.urlEncode(dataSource)))); + uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=waiting&datasource=%s", StringUtils.urlEncode(dataSource)))); + return uncompletedTasks; + } + private List getTasks(String identifier) { try { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java index e082ffc9f66..2c3b59d9152 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java @@ -57,7 +57,6 @@ public class DruidTestModuleFactory implements IModuleFactory @Override public Module createModule(ITestContext context, Class testClass) { - context.addGuiceModule(DruidTestModule.class, MODULE); context.addInjector(Collections.singletonList(MODULE), INJECTOR); return MODULE; } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java new file mode 100644 index 00000000000..d63d08833cd --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.google.common.collect.ImmutableList; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreatePartitionsResult; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class KafkaAdminClient implements StreamAdminClient +{ + private AdminClient adminClient; + + public KafkaAdminClient(String kafkaInternalHost) + { + Properties config = new Properties(); + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInternalHost); + adminClient = AdminClient.create(config); + } + + @Override + public void createStream(String streamName, int partitionCount, Map tags) throws Exception + { + final short replicationFactor = 1; + final NewTopic newTopic = new NewTopic(streamName, partitionCount, replicationFactor); + final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic)); + // Wait for create topic to compelte + createTopicsResult.values().get(streamName).get(); + } + + @Override + public void deleteStream(String streamName) throws Exception + { + DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(ImmutableList.of(streamName)); + deleteTopicsResult.values().get(streamName).get(); + } + + /** + * This method can only increase the partition count of {@param streamName} to have a final partition + * count of {@param newPartitionCount} + * If {@param blocksUntilStarted} is set to true, then this method will blocks until the partitioning + * started (but not nessesary finished), otherwise, the method will returns right after issue the + * repartitioning command + */ + @Override + public void updatePartitionCount(String streamName, int newPartitionCount, boolean blocksUntilStarted) throws Exception + { + Map counts = new HashMap<>(); + counts.put(streamName, NewPartitions.increaseTo(newPartitionCount)); + CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(counts); + if (blocksUntilStarted) { + createPartitionsResult.values().get(streamName).get(); + + } + } + + /** + * Stream state such as active/non-active does not applies to Kafka. + * Returning true since Kafka stream is always active and can always be writen and read to. + */ + @Override + public boolean isStreamActive(String streamName) + { + return true; + } + + @Override + public int getStreamPartitionCount(String streamName) throws Exception + { + DescribeTopicsResult result = adminClient.describeTopics(ImmutableList.of(streamName)); + TopicDescription topicDescription = result.values().get(streamName).get(); + return topicDescription.partitions().size(); + } + + @Override + public boolean verfiyPartitionCountUpdated(String streamName, int oldPartitionCount, int newPartitionCount) throws Exception + { + return getStreamPartitionCount(streamName) == newPartitionCount; + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java new file mode 100644 index 00000000000..f7ec75507b9 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import org.apache.druid.indexer.TaskIdUtils; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; + +public class KafkaEventWriter implements StreamEventWriter +{ + private static final String TEST_PROPERTY_PREFIX = "kafka.test.property."; + private final KafkaProducer producer; + private final boolean txnEnabled; + private final List> pendingWriteRecords = new ArrayList<>(); + + public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled) + { + Properties properties = new Properties(); + addFilteredProperties(config, properties); + properties.setProperty("bootstrap.servers", config.getKafkaHost()); + properties.setProperty("acks", "all"); + properties.setProperty("retries", "3"); + properties.setProperty("key.serializer", ByteArraySerializer.class.getName()); + properties.setProperty("value.serializer", ByteArraySerializer.class.getName()); + this.txnEnabled = txnEnabled; + if (txnEnabled) { + properties.setProperty("enable.idempotence", "true"); + properties.setProperty("transactional.id", TaskIdUtils.getRandomId()); + } + this.producer = new KafkaProducer<>( + properties, + new StringSerializer(), + new StringSerializer() + ); + if (txnEnabled) { + producer.initTransactions(); + } + } + + @Override + public boolean isTransactionEnabled() + { + return txnEnabled; + } + + @Override + public void initTransaction() + { + if (txnEnabled) { + producer.beginTransaction(); + } else { + throw new IllegalStateException("Kafka writer was initialized with transaction disabled"); + } + } + + @Override + public void commitTransaction() + { + if (txnEnabled) { + producer.commitTransaction(); + } else { + throw new IllegalStateException("Kafka writer was initialized with transaction disabled"); + } + } + + @Override + public void write(String topic, String event) + { + Future future = producer.send(new ProducerRecord<>(topic, event)); + pendingWriteRecords.add(future); + } + + @Override + public void shutdown() + { + producer.close(); + } + + @Override + public void flush() throws Exception + { + for (Future future : pendingWriteRecords) { + future.get(); + } + pendingWriteRecords.clear(); + } + + private void addFilteredProperties(IntegrationTestingConfig config, Properties properties) + { + for (Map.Entry entry : config.getProperties().entrySet()) { + if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) { + properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue()); + } + } + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java index bc5ace2d1b4..7c8759ae0e3 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java @@ -42,7 +42,7 @@ import java.io.FileInputStream; import java.util.Map; import java.util.Properties; -public class KinesisAdminClient +public class KinesisAdminClient implements StreamAdminClient { private AmazonKinesis amazonKinesis; @@ -70,6 +70,7 @@ public class KinesisAdminClient )).build(); } + @Override public void createStream(String streamName, int shardCount, Map tags) { CreateStreamResult createStreamResult = amazonKinesis.createStream(streamName, shardCount); @@ -88,6 +89,7 @@ public class KinesisAdminClient } + @Override public void deleteStream(String streamName) { DeleteStreamResult deleteStreamResult = amazonKinesis.deleteStream(streamName); @@ -101,9 +103,10 @@ public class KinesisAdminClient * If {@param blocksUntilStarted} is set to true, then this method will blocks until the resharding * started (but not nessesary finished), otherwise, the method will returns right after issue the reshard command */ - public void updateShardCount(String streamName, int newShardCount, boolean blocksUntilStarted) + @Override + public void updatePartitionCount(String streamName, int newShardCount, boolean blocksUntilStarted) { - int originalShardCount = getStreamShardCount(streamName); + int originalShardCount = getStreamPartitionCount(streamName); UpdateShardCountRequest updateShardCountRequest = new UpdateShardCountRequest(); updateShardCountRequest.setStreamName(streamName); updateShardCountRequest.setTargetShardCount(newShardCount); @@ -129,18 +132,31 @@ public class KinesisAdminClient } } + @Override public boolean isStreamActive(String streamName) { StreamDescription streamDescription = getStreamDescription(streamName); return verifyStreamStatus(streamDescription, StreamStatus.ACTIVE); } - public int getStreamShardCount(String streamName) + @Override + public int getStreamPartitionCount(String streamName) { StreamDescription streamDescription = getStreamDescription(streamName); return getStreamShardCount(streamDescription); } + @Override + public boolean verfiyPartitionCountUpdated(String streamName, int oldShardCount, int newShardCount) + { + int actualShardCount = getStreamPartitionCount(streamName); + // Kinesis does not immediately drop the old shards after the resharding and hence, + // would still returns both open shards and closed shards from the API call. + // To verify, we sum the old count (closed shareds) and the expected new count (open shards) + return actualShardCount == oldShardCount + newShardCount; + } + + private boolean verifyStreamStatus(StreamDescription streamDescription, StreamStatus streamStatusToCheck) { return streamStatusToCheck.toString().equals(streamDescription.getStreamStatus()); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java index 09950f13204..0377e9e42e0 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java @@ -63,6 +63,24 @@ public class KinesisEventWriter implements StreamEventWriter this.kinesisProducer = new KinesisProducer(kinesisProducerConfiguration); } + @Override + public boolean isTransactionEnabled() + { + return false; + } + + @Override + public void initTransaction() + { + // No-Op as Kinesis does not support transaction + } + + @Override + public void commitTransaction() + { + // No-Op as Kinesis does not support transaction + } + @Override public void write(String streamName, String event) { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamAdminClient.java new file mode 100644 index 00000000000..ea36d1c969a --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamAdminClient.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import java.util.Map; + +/** + * This interface provides the administrative client contract for any stream storage (such as Kafka and Kinesis) + * which supports managing and inspecting streams (aka topics) and stream's partitions (aka shards). + * This is used for setting up, tearing down and any other administrative changes required in integration tests. + * Each method resulting in a change of state for the stream is intended to be synchronous to help + * make integration tests deterministic and easy to write. + */ +public interface StreamAdminClient +{ + void createStream(String streamName, int partitionCount, Map tags) throws Exception; + + void deleteStream(String streamName) throws Exception; + + void updatePartitionCount(String streamName, int newPartitionCount, boolean blocksUntilStarted) throws Exception; + + boolean isStreamActive(String streamName); + + int getStreamPartitionCount(String streamName) throws Exception; + + boolean verfiyPartitionCountUpdated(String streamName, int oldPartitionCount, int newPartitionCount) throws Exception; +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java index 1bfd6b67591..5d25916b6f6 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java @@ -19,11 +19,22 @@ package org.apache.druid.testing.utils; + +/** + * This interface is use to write test event data to the underlying stream (such as Kafka, Kinesis) + * This can also be use with {@link StreamGenerator} to write particular set of test data + */ public interface StreamEventWriter { void write(String topic, String event); void shutdown(); - void flush(); + void flush() throws Exception; + + boolean isTransactionEnabled(); + + void initTransaction(); + + void commitTransaction(); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java index a232c59a8d6..f2d1f489d88 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java @@ -23,9 +23,9 @@ import org.joda.time.DateTime; public interface StreamGenerator { - void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds); + void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds); - void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime); + void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime); void shutdown(); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java new file mode 100644 index 00000000000..6259156b03b --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.google.inject.Injector; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.testng.ISuite; +import org.testng.ISuiteListener; + +public class SuiteListener implements ISuiteListener +{ + private static final Logger LOG = new Logger(SuiteListener.class); + + @Override + public void onStart(ISuite suite) + { + Injector injector = DruidTestModuleFactory.getInjector(); + IntegrationTestingConfig config = injector.getInstance(IntegrationTestingConfig.class); + DruidClusterAdminClient druidClusterAdminClient = injector.getInstance(DruidClusterAdminClient.class); + + druidClusterAdminClient.waitUntilCoordinatorReady(); + druidClusterAdminClient.waitUntilIndexerReady(); + druidClusterAdminClient.waitUntilBrokerReady(); + String routerHost = config.getRouterUrl(); + if (null != routerHost) { + druidClusterAdminClient.waitUntilRouterReady(); + } + Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + try { + lifecycle.start(); + } + catch (Exception e) { + LOG.error(e, ""); + throw new RuntimeException(e); + } + } + + @Override + public void onFinish(ISuite suite) + { + Injector injector = DruidTestModuleFactory.getInjector(); + Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + lifecycle.stop(); + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java index 748a6ed2d3b..f2bfde857c6 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java @@ -61,13 +61,13 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator abstract Object getEvent(int row, DateTime timestamp); @Override - public void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds) + public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds) { - start(streamTopic, streamEventWriter, totalNumberOfSeconds, null); + run(streamTopic, streamEventWriter, totalNumberOfSeconds, null); } @Override - public void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime) + public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime) { // The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond] // or the [overrrideFirstEventTime] as the primary timestamp. @@ -94,6 +94,10 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs) ); + if (streamEventWriter.isTransactionEnabled()) { + streamEventWriter.initTransaction(); + } + for (int i = 1; i <= eventsPerSecond; i++) { streamEventWriter.write(streamTopic, MAPPER.writeValueAsString(getEvent(i, eventTimestamp))); @@ -107,6 +111,10 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator } } + if (streamEventWriter.isTransactionEnabled()) { + streamEventWriter.commitTransaction(); + } + nowCeilingToSecond = nowCeilingToSecond.plusSeconds(1); eventTimestamp = eventTimestamp.plusSeconds(1); seconds++; diff --git a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java deleted file mode 100644 index 53ef663b5c3..00000000000 --- a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package /*CHECKSTYLE.OFF: PackageName*/org.testng/*CHECKSTYLE.ON: PackageName*/; - -import com.google.inject.Injector; -import org.apache.druid.java.util.common.lifecycle.Lifecycle; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.testing.IntegrationTestingConfig; -import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.DruidClusterAdminClient; -import org.testng.internal.IConfiguration; -import org.testng.internal.annotations.IAnnotationFinder; -import org.testng.xml.XmlTest; - -import java.util.List; - -/** - * This class must be in package org.testng to access protected methods like TestNG.getDefault().getConfiguration() - */ -public class DruidTestRunnerFactory implements ITestRunnerFactory -{ - private static final Logger LOG = new Logger(DruidTestRunnerFactory.class); - - @Override - public TestRunner newTestRunner(ISuite suite, XmlTest test, List listeners) - { - IConfiguration configuration = TestNG.getDefault().getConfiguration(); - String outputDirectory = suite.getOutputDirectory(); - IAnnotationFinder annotationFinder = configuration.getAnnotationFinder(); - Boolean skipFailedInvocationCounts = suite.getXmlSuite().skipFailedInvocationCounts(); - return new DruidTestRunner( - configuration, - suite, - test, - outputDirectory, - annotationFinder, - skipFailedInvocationCounts, - listeners - ); - } - - private static class DruidTestRunner extends TestRunner - { - - protected DruidTestRunner( - IConfiguration configuration, - ISuite suite, - XmlTest test, - String outputDirectory, - IAnnotationFinder finder, - boolean skipFailedInvocationCounts, - List invokedMethodListeners - ) - { - super(configuration, suite, test, outputDirectory, finder, skipFailedInvocationCounts, invokedMethodListeners); - } - - @Override - public void run() - { - Injector injector = DruidTestModuleFactory.getInjector(); - IntegrationTestingConfig config = injector.getInstance(IntegrationTestingConfig.class); - DruidClusterAdminClient druidClusterAdminClient = injector.getInstance(DruidClusterAdminClient.class); - - druidClusterAdminClient.waitUntilCoordinatorReady(); - druidClusterAdminClient.waitUntilIndexerReady(); - druidClusterAdminClient.waitUntilBrokerReady(); - String routerHost = config.getRouterUrl(); - if (null != routerHost) { - druidClusterAdminClient.waitUntilRouterReady(); - } - Lifecycle lifecycle = injector.getInstance(Lifecycle.class); - try { - lifecycle.start(); - runTests(); - } - catch (Exception e) { - LOG.error(e, ""); - throw new RuntimeException(e); - } - finally { - lifecycle.stop(); - } - - } - - private void runTests() - { - super.run(); - } - } -} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index c0116f9149e..9d8b7fe0b4e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -31,6 +31,12 @@ public class TestNGGroup public static final String KAFKA_INDEX = "kafka-index"; + public static final String KAFKA_INDEX_SLOW = "kafka-index-slow"; + + public static final String TRANSACTIONAL_KAFKA_INDEX = "kafka-transactional-index"; + + public static final String TRANSACTIONAL_KAFKA_INDEX_SLOW = "kafka-transactional-index-slow"; + public static final String OTHER_INDEX = "other-index"; public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index"; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 6c57e219c1c..854c79ee097 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -243,6 +243,9 @@ public class ITAutoCompactionTest extends AbstractIndexerTest null); compactionResource.submitCompactionConfig(compactionConfig); + // Wait for compaction config to persist + Thread.sleep(2000); + // Verify that the compaction config is updated correctly. CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); DataSourceCompactionConfig foundDataSourceCompactionConfig = null; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java deleted file mode 100644 index a20254d2cf0..00000000000 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.tests.indexer; - -import com.google.common.base.Throwables; -import com.google.inject.Inject; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; -import org.apache.commons.io.IOUtils; -import org.apache.druid.indexer.TaskIdUtils; -import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.testing.IntegrationTestingConfig; -import org.apache.druid.testing.utils.ITRetryUtil; -import org.apache.druid.testing.utils.TestQueryHelper; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Properties; - -abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest -{ - private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class); - protected static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json"; - protected static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json"; - private static final String QUERIES_FILE = "/indexer/stream_index_queries.json"; - private static final String TOPIC_NAME = "kafka_indexing_service_topic"; - - private static final int NUM_EVENTS_TO_SEND = 60; - private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L; - private static final String TEST_PROPERTY_PREFIX = "kafka.test.property."; - - // We'll fill in the current time and numbers for added, deleted and changed - // before sending the event. - private static final String EVENT_TEMPLATE = - "{\"timestamp\": \"%s\"," + - "\"page\": \"Gypsy Danger\"," + - "\"language\" : \"en\"," + - "\"user\" : \"nuclear\"," + - "\"unpatrolled\" : \"true\"," + - "\"newPage\" : \"true\"," + - "\"robot\": \"false\"," + - "\"anonymous\": \"false\"," + - "\"namespace\":\"article\"," + - "\"continent\":\"North America\"," + - "\"country\":\"United States\"," + - "\"region\":\"Bay Area\"," + - "\"city\":\"San Francisco\"," + - "\"added\":%d," + - "\"deleted\":%d," + - "\"delta\":%d}"; - - private ZkUtils zkUtils; - private boolean segmentsExist; // to tell if we should remove segments during teardown - - // format for the querying interval - private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); - // format for the expected timestamp in a query response - private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); - - @Inject - private TestQueryHelper queryHelper; - @Inject - private IntegrationTestingConfig config; - - private String fullDatasourceName; - - void doKafkaIndexTest(String dataSourceName, String supervisorSpecPath, boolean txnEnabled) - { - fullDatasourceName = dataSourceName + config.getExtraDatasourceNameSuffix(); - // create topic - try { - int sessionTimeoutMs = 10000; - int connectionTimeoutMs = 10000; - String zkHosts = config.getZookeeperHosts(); - ZkClient zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); - zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false); - if (config.manageKafkaTopic()) { - int numPartitions = 4; - int replicationFactor = 1; - Properties topicConfig = new Properties(); - AdminUtils.createTopic( - zkUtils, - TOPIC_NAME, - numPartitions, - replicationFactor, - topicConfig, - RackAwareMode.Disabled$.MODULE$ - ); - } - } - catch (Exception e) { - throw new ISE(e, "could not create kafka topic"); - } - - String spec; - try { - LOG.info("supervisorSpec name: [%s]", supervisorSpecPath); - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); - final Properties consumerProperties = new Properties(); - consumerProperties.putAll(consumerConfigs); - consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost()); - - spec = getResourceAsString(supervisorSpecPath); - spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName); - spec = StringUtils.replace(spec, "%%STREAM_TYPE%%", "kafka"); - spec = StringUtils.replace(spec, "%%TOPIC_KEY%%", "topic"); - spec = StringUtils.replace(spec, "%%TOPIC_VALUE%%", TOPIC_NAME); - spec = StringUtils.replace(spec, "%%USE_EARLIEST_KEY%%", "useEarliestOffset"); - spec = StringUtils.replace(spec, "%%STREAM_PROPERTIES_KEY%%", "consumerProperties"); - spec = StringUtils.replace(spec, "%%STREAM_PROPERTIES_VALUE%%", jsonMapper.writeValueAsString(consumerProperties)); - LOG.info("supervisorSpec: [%s]\n", spec); - } - catch (Exception e) { - LOG.error("could not read file [%s]", supervisorSpecPath); - throw new ISE(e, "could not read file [%s]", supervisorSpecPath); - } - - // start supervisor - String supervisorId = indexer.submitSupervisor(spec); - LOG.info("Submitted supervisor"); - - // set up kafka producer - Properties properties = new Properties(); - addFilteredProperties(config, properties); - properties.setProperty("bootstrap.servers", config.getKafkaHost()); - LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost()); - properties.setProperty("acks", "all"); - properties.setProperty("retries", "3"); - properties.setProperty("key.serializer", ByteArraySerializer.class.getName()); - properties.setProperty("value.serializer", ByteArraySerializer.class.getName()); - if (txnEnabled) { - properties.setProperty("enable.idempotence", "true"); - properties.setProperty("transactional.id", TaskIdUtils.getRandomId()); - } - - KafkaProducer producer = new KafkaProducer<>( - properties, - new StringSerializer(), - new StringSerializer() - ); - - DateTimeZone zone = DateTimes.inferTzFromString("UTC"); - // format for putting into events - DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); - - DateTime dt = new DateTime(zone); // timestamp to put on events - DateTime dtFirst = dt; // timestamp of 1st event - DateTime dtLast = dt; // timestamp of last event - - // these are used to compute the expected aggregations - int added = 0; - int num_events = 0; - - // send data to kafka - if (txnEnabled) { - producer.initTransactions(); - producer.beginTransaction(); - } - while (num_events < NUM_EVENTS_TO_SEND) { - num_events++; - added += num_events; - // construct the event to send - String event = StringUtils.format(EVENT_TEMPLATE, event_fmt.print(dt), num_events, 0, num_events); - LOG.info("sending event: [%s]", event); - try { - - producer.send(new ProducerRecord<>(TOPIC_NAME, event)).get(); - - } - catch (Exception ioe) { - throw Throwables.propagate(ioe); - } - - dtLast = dt; - dt = new DateTime(zone); - } - if (txnEnabled) { - producer.commitTransaction(); - } - producer.close(); - - LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS); - try { - Thread.sleep(WAIT_TIME_MILLIS); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - - InputStream is = AbstractKafkaIndexerTest.class.getResourceAsStream(QUERIES_FILE); - if (null == is) { - throw new ISE("could not open query file: %s", QUERIES_FILE); - } - - // put the timestamps into the query structure - String query_response_template; - try { - query_response_template = IOUtils.toString(is, StandardCharsets.UTF_8); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", QUERIES_FILE); - } - - String queryStr = query_response_template; - queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2))); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_ADDED%%", Integer.toString(added)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events)); - - // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing - try { - this.queryHelper.testQueriesFromString(queryStr, 2); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - LOG.info("Shutting down Kafka Supervisor"); - indexer.shutdownSupervisor(supervisorId); - - // wait for all kafka indexing tasks to finish - LOG.info("Waiting for all kafka indexing tasks to finish"); - ITRetryUtil.retryUntilTrue( - () -> (indexer.getPendingTasks().size() - + indexer.getRunningTasks().size() - + indexer.getWaitingTasks().size()) == 0, - "Waiting for Tasks Completion" - ); - - // wait for segments to be handed off - try { - ITRetryUtil.retryUntil( - () -> coordinator.areSegmentsLoaded(fullDatasourceName), - true, - 10000, - 30, - "Real-time generated segments loaded" - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - LOG.info("segments are present"); - segmentsExist = true; - - // this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4 - try { - this.queryHelper.testQueriesFromString(queryStr, 2); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - private void addFilteredProperties(IntegrationTestingConfig config, Properties properties) - { - for (Map.Entry entry : config.getProperties().entrySet()) { - if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) { - properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue()); - } - } - } - - void doTearDown() - { - if (config.manageKafkaTopic()) { - // delete kafka topic - AdminUtils.deleteTopic(zkUtils, TOPIC_NAME); - } - - // remove segments - if (segmentsExist && fullDatasourceName != null) { - unloadAndKillData(fullDatasourceName); - } - } -} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java new file mode 100644 index 00000000000..ce769bfb59b --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.utils.KafkaAdminClient; +import org.apache.druid.testing.utils.KafkaEventWriter; +import org.apache.druid.testing.utils.StreamAdminClient; +import org.apache.druid.testing.utils.StreamEventWriter; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Function; + +public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamIndexingTest +{ + protected abstract boolean isKafkaWriterTransactionalEnabled(); + + @Override + StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) + { + return new KafkaAdminClient(config.getKafkaHost()); + } + + @Override + public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) + { + return new KafkaEventWriter(config, isKafkaWriterTransactionalEnabled()); + } + + @Override + Function generateStreamIngestionPropsTransform(String streamName, + String fullDatasourceName, + IntegrationTestingConfig config) + { + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + final Properties consumerProperties = new Properties(); + consumerProperties.putAll(consumerConfigs); + consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost()); + return spec -> { + try { + spec = StringUtils.replace( + spec, + "%%DATASOURCE%%", + fullDatasourceName + ); + spec = StringUtils.replace( + spec, + "%%STREAM_TYPE%%", + "kafka" + ); + spec = StringUtils.replace( + spec, + "%%TOPIC_KEY%%", + "topic" + ); + spec = StringUtils.replace( + spec, + "%%TOPIC_VALUE%%", + streamName + ); + spec = StringUtils.replace( + spec, + "%%USE_EARLIEST_KEY%%", + "useEarliestOffset" + ); + spec = StringUtils.replace( + spec, + "%%STREAM_PROPERTIES_KEY%%", + "consumerProperties" + ); + return StringUtils.replace( + spec, + "%%STREAM_PROPERTIES_VALUE%%", + jsonMapper.writeValueAsString(consumerProperties) + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + } + + @Override + Function generateStreamQueryPropsTransform(String streamName, String fullDatasourceName) + { + return spec -> { + try { + spec = StringUtils.replace( + spec, + "%%DATASOURCE%%", + fullDatasourceName + ); + spec = StringUtils.replace( + spec, + "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1)) + ); + spec = StringUtils.replace( + spec, + "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_QUERY_START%%", + INTERVAL_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_QUERY_END%%", + INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2)) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_RESPONSE_TIMESTAMP%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_ADDED%%", + Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND) + ); + return StringUtils.replace( + spec, + "%%TIMESERIES_NUMEVENTS%%", + Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND) + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java new file mode 100644 index 00000000000..14c9cac8ac2 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.utils.KinesisAdminClient; +import org.apache.druid.testing.utils.KinesisEventWriter; +import org.apache.druid.testing.utils.StreamAdminClient; +import org.apache.druid.testing.utils.StreamEventWriter; + +import java.util.function.Function; + +public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamIndexingTest +{ + @Override + StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception + { + return new KinesisAdminClient(config.getStreamEndpoint()); + } + + @Override + StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception + { + return new KinesisEventWriter(config.getStreamEndpoint(), false); + } + + @Override + Function generateStreamIngestionPropsTransform(String streamName, + String fullDatasourceName, + IntegrationTestingConfig config) + { + return spec -> { + try { + spec = StringUtils.replace( + spec, + "%%DATASOURCE%%", + fullDatasourceName + ); + spec = StringUtils.replace( + spec, + "%%STREAM_TYPE%%", + "kinesis" + ); + spec = StringUtils.replace( + spec, + "%%TOPIC_KEY%%", + "stream" + ); + spec = StringUtils.replace( + spec, + "%%TOPIC_VALUE%%", + streamName + ); + spec = StringUtils.replace( + spec, + "%%USE_EARLIEST_KEY%%", + "useEarliestSequenceNumber" + ); + spec = StringUtils.replace( + spec, + "%%STREAM_PROPERTIES_KEY%%", + "endpoint" + ); + return StringUtils.replace( + spec, + "%%STREAM_PROPERTIES_VALUE%%", + jsonMapper.writeValueAsString(config.getStreamEndpoint()) + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + } + + @Override + Function generateStreamQueryPropsTransform(String streamName, String fullDatasourceName) + { + return spec -> { + try { + spec = StringUtils.replace( + spec, + "%%DATASOURCE%%", + fullDatasourceName + ); + spec = StringUtils.replace( + spec, + "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1)) + ); + spec = StringUtils.replace( + spec, + "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_QUERY_START%%", + INTERVAL_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_QUERY_END%%", + INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2)) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_RESPONSE_TIMESTAMP%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_ADDED%%", + Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND) + ); + return StringUtils.replace( + spec, + "%%TIMESERIES_NUMEVENTS%%", + Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND) + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java new file mode 100644 index 00000000000..2f0c65afaeb --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.utils.DruidClusterAdminClient; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.StreamAdminClient; +import org.apache.druid.testing.utils.StreamEventWriter; +import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.io.Closeable; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest +{ + static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0); + // format for the querying interval + static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); + // format for the expected timestamp in a query response + static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); + static final int EVENTS_PER_SECOND = 6; + static final int TOTAL_NUMBER_OF_SECOND = 10; + // Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created + // to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method) + // The value to this tag is a timestamp that can be used by a lambda function to remove unused stream. + private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after"; + private static final int STREAM_SHARD_COUNT = 2; + private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L; + private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json"; + private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json"; + private static final String QUERIES_FILE = "/indexer/stream_index_queries.json"; + private static final long CYCLE_PADDING_MS = 100; + private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class); + + @Inject + private DruidClusterAdminClient druidClusterAdminClient; + + @Inject + private IntegrationTestingConfig config; + + private StreamAdminClient streamAdminClient; + private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator; + + abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception; + abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception; + abstract Function generateStreamIngestionPropsTransform(String streamName, + String fullDatasourceName, + IntegrationTestingConfig config); + abstract Function generateStreamQueryPropsTransform(String streamName, String fullDatasourceName); + public abstract String getTestNamePrefix(); + + protected void doBeforeClass() throws Exception + { + streamAdminClient = createStreamAdminClient(config); + wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS); + } + + protected void doClassTeardown() + { + wikipediaStreamEventGenerator.shutdown(); + } + + protected void doTestIndexDataWithLegacyParserStableState() throws Exception + { + StreamEventWriter streamEventWriter = createStreamEventWriter(config); + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); + try ( + final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) + ) { + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); + LOG.info("Submitted supervisor"); + // Start data generator + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME); + verifyIngestedData(generatedTestConfig); + } + finally { + doMethodTeardown(generatedTestConfig, streamEventWriter); + } + } + + protected void doTestIndexDataWithInputFormatStableState() throws Exception + { + StreamEventWriter streamEventWriter = createStreamEventWriter(config); + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); + try ( + final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) + ) { + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); + LOG.info("Submitted supervisor"); + // Start data generator + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME); + verifyIngestedData(generatedTestConfig); + } + finally { + doMethodTeardown(generatedTestConfig, streamEventWriter); + } + } + + void doTestIndexDataWithLosingCoordinator() throws Exception + { + testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady()); + } + + void doTestIndexDataWithLosingOverlord() throws Exception + { + testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady()); + } + + void doTestIndexDataWithLosingHistorical() throws Exception + { + testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady()); + } + + protected void doTestIndexDataWithStartStopSupervisor() throws Exception + { + StreamEventWriter streamEventWriter = createStreamEventWriter(config); + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); + try ( + final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) + ) { + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); + LOG.info("Submitted supervisor"); + // Start generating half of the data + int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; + int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); + // Verify supervisor is healthy before suspension + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Suspend the supervisor + indexer.suspendSupervisor(generatedTestConfig.getSupervisorId()); + // Start generating remainning half of the data + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); + // Resume the supervisor + indexer.resumeSupervisor(generatedTestConfig.getSupervisorId()); + // Verify supervisor is healthy after suspension + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Verify that supervisor can catch up with the stream + verifyIngestedData(generatedTestConfig); + } + finally { + doMethodTeardown(generatedTestConfig, streamEventWriter); + } + } + + protected void doTestIndexDataWithStreamReshardSplit() throws Exception + { + // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2 + testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT * 2); + } + + protected void doTestIndexDataWithStreamReshardMerge() throws Exception + { + // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2 + testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT / 2); + } + + private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception + { + StreamEventWriter streamEventWriter = createStreamEventWriter(config); + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); + try ( + final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) + ) { + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); + LOG.info("Submitted supervisor"); + // Start generating one third of the data (before restarting) + int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; + int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); + // Verify supervisor is healthy before restart + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Restart Druid process + LOG.info("Restarting Druid process"); + restartRunnable.run(); + LOG.info("Restarted Druid process"); + // Start generating one third of the data (while restarting) + int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound; + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); + // Wait for Druid process to be available + LOG.info("Waiting for Druid process to be available"); + waitForReadyRunnable.run(); + LOG.info("Druid process is now available"); + // Start generating remaining data (after restarting) + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)); + // Verify supervisor is healthy + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Verify that supervisor ingested all data + verifyIngestedData(generatedTestConfig); + } + finally { + doMethodTeardown(generatedTestConfig, streamEventWriter); + } + } + + private void testIndexWithStreamReshardHelper(int newShardCount) throws Exception + { + StreamEventWriter streamEventWriter = createStreamEventWriter(config); + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(); + try ( + final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName()) + ) { + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); + LOG.info("Submitted supervisor"); + // Start generating one third of the data (before resharding) + int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; + int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); + // Verify supervisor is healthy before resahrding + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Reshard the supervisor by split from STREAM_SHARD_COUNT to newShardCount and waits until the resharding starts + streamAdminClient.updatePartitionCount(generatedTestConfig.getStreamName(), newShardCount, true); + // Start generating one third of the data (while resharding) + int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound; + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); + // Wait for stream to finish resharding + ITRetryUtil.retryUntil( + () -> streamAdminClient.isStreamActive(generatedTestConfig.getStreamName()), + true, + 10000, + 30, + "Waiting for stream to finish resharding" + ); + ITRetryUtil.retryUntil( + () -> streamAdminClient.verfiyPartitionCountUpdated(generatedTestConfig.getStreamName(), STREAM_SHARD_COUNT, newShardCount), + true, + 10000, + 30, + "Waiting for stream to finish resharding" + ); + // Start generating remaining data (after resharding) + wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)); + // Verify supervisor is healthy after resahrding + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Verify that supervisor can catch up with the stream + verifyIngestedData(generatedTestConfig); + } + finally { + doMethodTeardown(generatedTestConfig, streamEventWriter); + } + } + + private void verifyIngestedData(GeneratedTestConfig generatedTestConfig) throws Exception + { + // Wait for supervisor to consume events + LOG.info("Waiting for [%s] millis for stream indexing tasks to consume events", WAIT_TIME_MILLIS); + Thread.sleep(WAIT_TIME_MILLIS); + // Query data + final String querySpec = generatedTestConfig.getStreamQueryPropsTransform().apply(getResourceAsString(QUERIES_FILE)); + // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing + this.queryHelper.testQueriesFromString(querySpec, 2); + LOG.info("Shutting down supervisor"); + indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId()); + // wait for all indexing tasks to finish + LOG.info("Waiting for all indexing tasks to finish"); + ITRetryUtil.retryUntilTrue( + () -> (indexer.getUncompletedTasksForDataSource(generatedTestConfig.getFullDatasourceName()).size() == 0), + "Waiting for Tasks Completion" + ); + // wait for segments to be handed off + ITRetryUtil.retryUntil( + () -> coordinator.areSegmentsLoaded(generatedTestConfig.getFullDatasourceName()), + true, + 10000, + 30, + "Real-time generated segments loaded" + ); + + // this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4 + this.queryHelper.testQueriesFromString(querySpec, 2); + } + + long getSumOfEventSequence(int numEvents) + { + return (numEvents * (1 + numEvents)) / 2; + } + + private void doMethodTeardown(GeneratedTestConfig generatedTestConfig, StreamEventWriter streamEventWriter) + { + try { + streamEventWriter.flush(); + streamEventWriter.shutdown(); + } + catch (Exception e) { + // Best effort cleanup as the writer may have already been cleanup + LOG.warn(e, "Failed to cleanup writer. This might be expected depending on the test method"); + } + try { + indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId()); + } + catch (Exception e) { + // Best effort cleanup as the supervisor may have already been cleanup + LOG.warn(e, "Failed to cleanup supervisor. This might be expected depending on the test method"); + } + try { + unloader(generatedTestConfig.getFullDatasourceName()); + } + catch (Exception e) { + // Best effort cleanup as the datasource may have already been cleanup + LOG.warn(e, "Failed to cleanup datasource. This might be expected depending on the test method"); + } + try { + streamAdminClient.deleteStream(generatedTestConfig.getStreamName()); + } + catch (Exception e) { + // Best effort cleanup as the stream may have already been cleanup + LOG.warn(e, "Failed to cleanup stream. This might be expected depending on the test method"); + } + } + + private class GeneratedTestConfig + { + private String streamName; + private String fullDatasourceName; + private String supervisorId; + private Function streamIngestionPropsTransform; + private Function streamQueryPropsTransform; + + GeneratedTestConfig() throws Exception + { + streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID(); + String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID(); + Map tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis())); + streamAdminClient.createStream(streamName, STREAM_SHARD_COUNT, tags); + ITRetryUtil.retryUntil( + () -> streamAdminClient.isStreamActive(streamName), + true, + 10000, + 30, + "Wait for stream active" + ); + fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix(); + streamIngestionPropsTransform = generateStreamIngestionPropsTransform(streamName, fullDatasourceName, config); + streamQueryPropsTransform = generateStreamQueryPropsTransform(streamName, fullDatasourceName); + } + + public String getSupervisorId() + { + return supervisorId; + } + + public void setSupervisorId(String supervisorId) + { + this.supervisorId = supervisorId; + } + + public String getStreamName() + { + return streamName; + } + + public String getFullDatasourceName() + { + return fullDatasourceName; + } + + public Function getStreamIngestionPropsTransform() + { + return streamIngestionPropsTransform; + } + + public Function getStreamQueryPropsTransform() + { + return streamQueryPropsTransform; + } + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java new file mode 100644 index 00000000000..99713a72984 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Test(groups = TestNGGroup.KAFKA_INDEX_SLOW) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest +{ + @Override + protected boolean isKafkaWriterTransactionalEnabled() + { + return false; + } + + @Override + public String getTestNamePrefix() + { + return "kafka_nontransactional_serialized"; + } + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + @AfterClass + public void tearDown() + { + doClassTeardown(); + } + + /** + * This test must be run individually since the test affect and modify the state of the Druid cluster + */ + @Test + public void testKafkaIndexDataWithLosingCoordinator() throws Exception + { + doTestIndexDataWithLosingCoordinator(); + } + + /** + * This test must be run individually since the test affect and modify the state of the Druid cluster + */ + @Test + public void testKafkaIndexDataWithLosingOverlord() throws Exception + { + doTestIndexDataWithLosingOverlord(); + } + + /** + * This test must be run individually since the test affect and modify the state of the Druid cluster + */ + @Test + public void testKafkaIndexDataWithLosingHistorical() throws Exception + { + doTestIndexDataWithLosingHistorical(); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java deleted file mode 100644 index 30e4bab23e1..00000000000 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.tests.indexer; - -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.tests.TestNGGroup; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -@Test(groups = TestNGGroup.KAFKA_INDEX) -@Guice(moduleFactory = DruidTestModuleFactory.class) -public class ITKafkaIndexingServiceTest extends AbstractKafkaIndexerTest -{ - private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class); - private static final String DATASOURCE = "kafka_indexing_service_test"; - - @DataProvider - public static Object[][] testParams() - { - return new Object[][]{ - {"legacy_parser"}, - {"input_format"} - }; - } - - @Test(dataProvider = "testParams") - public void testKafka(String param) - { - final String supervisorSpecPath = "legacy_parser".equals(param) - ? INDEXER_FILE_LEGACY_PARSER - : INDEXER_FILE_INPUT_FORMAT; - LOG.info("Starting test: ITKafkaIndexingServiceTest"); - doKafkaIndexTest(StringUtils.format("%s_%s", DATASOURCE, param), supervisorSpecPath, false); - } - - @AfterMethod - public void afterClass() - { - LOG.info("teardown"); - doTearDown(); - } -} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java new file mode 100644 index 00000000000..06bcf050d67 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest +{ + @Override + protected boolean isKafkaWriterTransactionalEnabled() + { + return true; + } + + @Override + public String getTestNamePrefix() + { + return "kafka_transactional_serialized"; + } + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + @AfterClass + public void tearDown() + { + doClassTeardown(); + } + + /** + * This test must be run individually since the test affect and modify the state of the Druid cluster + */ + @Test + public void testKafkaIndexDataWithLosingCoordinator() throws Exception + { + doTestIndexDataWithLosingCoordinator(); + } + + /** + * This test must be run individually since the test affect and modify the state of the Druid cluster + */ + @Test + public void testKafkaIndexDataWithLosingOverlord() throws Exception + { + doTestIndexDataWithLosingOverlord(); + } + + /** + * This test must be run individually since the test affect and modify the state of the Druid cluster + */ + @Test + public void testKafkaIndexDataWithLosingHistorical() throws Exception + { + doTestIndexDataWithLosingHistorical(); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java deleted file mode 100644 index f32b82433f1..00000000000 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.tests.indexer; - -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.tests.TestNGGroup; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -/** - * This is a test for the Kafka indexing service with transactional topics - */ -@Test(groups = TestNGGroup.KAFKA_INDEX) -@Guice(moduleFactory = DruidTestModuleFactory.class) -public class ITKafkaIndexingServiceTransactionalTest extends AbstractKafkaIndexerTest -{ - private static final Logger LOG = new Logger(ITKafkaIndexingServiceTransactionalTest.class); - private static final String DATASOURCE = "kafka_indexing_service_txn_test"; - - @DataProvider - public static Object[][] testParams() - { - return new Object[][]{ - {"legacy_parser"}, - {"input_format"} - }; - } - - @Test(dataProvider = "testParams") - public void testKafka(String param) - { - final String supervisorSpecPath = "legacy_parser".equals(param) - ? INDEXER_FILE_LEGACY_PARSER - : INDEXER_FILE_INPUT_FORMAT; - LOG.info("Starting test: ITKafkaIndexingServiceTransactionalTest"); - doKafkaIndexTest(StringUtils.format("%s_%s", DATASOURCE, param), supervisorSpecPath, true); - } - - @AfterMethod - public void afterClass() - { - LOG.info("teardown"); - doTearDown(); - } -} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java new file mode 100644 index 00000000000..8e64abb6556 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Test(groups = TestNGGroup.KINESIS_INDEX) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKinesisIndexingServiceSerializedTest extends AbstractKinesisIndexingServiceTest +{ + @Override + public String getTestNamePrefix() + { + return "kinesis_serialized"; + } + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + @AfterClass + public void tearDown() + { + doClassTeardown(); + } + + /** + * This test must be run individually since the test affect and modify the state of the Druid cluster + */ + @Test + public void testKinesisIndexDataWithLosingCoordinator() throws Exception + { + doTestIndexDataWithLosingCoordinator(); + } + + /** + * This test must be run individually since the test affect and modify the state of the Druid cluster + */ + @Test + public void testKinesisIndexDataWithLosingOverlord() throws Exception + { + doTestIndexDataWithLosingOverlord(); + } + + /** + * This test must be run individually since the test affect and modify the state of the Druid cluster + */ + @Test + public void testKinesisIndexDataWithLosingHistorical() throws Exception + { + doTestIndexDataWithLosingHistorical(); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java deleted file mode 100644 index b539b5d547c..00000000000 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java +++ /dev/null @@ -1,480 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.tests.indexer; - -import com.google.common.collect.ImmutableMap; -import com.google.inject.Inject; -import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.DruidClusterAdminClient; -import org.apache.druid.testing.utils.ITRetryUtil; -import org.apache.druid.testing.utils.KinesisAdminClient; -import org.apache.druid.testing.utils.KinesisEventWriter; -import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator; -import org.apache.druid.tests.TestNGGroup; -import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -import java.io.Closeable; -import java.util.Map; -import java.util.UUID; -import java.util.function.Function; - -@Test(groups = TestNGGroup.KINESIS_INDEX) -@Guice(moduleFactory = DruidTestModuleFactory.class) -public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest -{ - private static final Logger LOG = new Logger(ITKinesisIndexingServiceTest.class); - private static final int KINESIS_SHARD_COUNT = 2; - // Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created - // to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method) - // The value to this tag is a timestamp that can be used by a lambda function to remove unused stream. - private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after"; - private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L; - private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0); - private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json"; - private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json"; - private static final String QUERIES_FILE = "/indexer/stream_index_queries.json"; - // format for the querying interval - private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); - // format for the expected timestamp in a query response - private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); - private static final int EVENTS_PER_SECOND = 6; - private static final long CYCLE_PADDING_MS = 100; - private static final int TOTAL_NUMBER_OF_SECOND = 10; - - @Inject - private DruidClusterAdminClient druidClusterAdminClient; - - private String streamName; - private String fullDatasourceName; - private KinesisAdminClient kinesisAdminClient; - private KinesisEventWriter kinesisEventWriter; - private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator; - private Function kinesisIngestionPropsTransform; - private Function kinesisQueryPropsTransform; - private String supervisorId; - private int secondsToGenerateRemaining; - - @BeforeClass - public void beforeClass() throws Exception - { - kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint()); - kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false); - wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS); - } - - @AfterClass - public void tearDown() - { - wikipediaStreamEventGenerator.shutdown(); - kinesisEventWriter.shutdown(); - } - - @BeforeMethod - public void before() - { - streamName = "kinesis_index_test_" + UUID.randomUUID(); - String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID(); - Map tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis())); - kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags); - ITRetryUtil.retryUntil( - () -> kinesisAdminClient.isStreamActive(streamName), - true, - 10000, - 30, - "Wait for stream active" - ); - secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; - fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix(); - kinesisIngestionPropsTransform = spec -> { - try { - spec = StringUtils.replace( - spec, - "%%DATASOURCE%%", - fullDatasourceName - ); - spec = StringUtils.replace( - spec, - "%%STREAM_TYPE%%", - "kinesis" - ); - spec = StringUtils.replace( - spec, - "%%TOPIC_KEY%%", - "stream" - ); - spec = StringUtils.replace( - spec, - "%%TOPIC_VALUE%%", - streamName - ); - spec = StringUtils.replace( - spec, - "%%USE_EARLIEST_KEY%%", - "useEarliestSequenceNumber" - ); - spec = StringUtils.replace( - spec, - "%%STREAM_PROPERTIES_KEY%%", - "endpoint" - ); - return StringUtils.replace( - spec, - "%%STREAM_PROPERTIES_VALUE%%", - jsonMapper.writeValueAsString(config.getStreamEndpoint()) - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - }; - kinesisQueryPropsTransform = spec -> { - try { - spec = StringUtils.replace( - spec, - "%%DATASOURCE%%", - fullDatasourceName - ); - spec = StringUtils.replace( - spec, - "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", - TIMESTAMP_FMT.print(FIRST_EVENT_TIME) - ); - spec = StringUtils.replace( - spec, - "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", - TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1)) - ); - spec = StringUtils.replace( - spec, - "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", - TIMESTAMP_FMT.print(FIRST_EVENT_TIME) - ); - spec = StringUtils.replace( - spec, - "%%TIMESERIES_QUERY_START%%", - INTERVAL_FMT.print(FIRST_EVENT_TIME) - ); - spec = StringUtils.replace( - spec, - "%%TIMESERIES_QUERY_END%%", - INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2)) - ); - spec = StringUtils.replace( - spec, - "%%TIMESERIES_RESPONSE_TIMESTAMP%%", - TIMESTAMP_FMT.print(FIRST_EVENT_TIME) - ); - spec = StringUtils.replace( - spec, - "%%TIMESERIES_ADDED%%", - Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND) - ); - return StringUtils.replace( - spec, - "%%TIMESERIES_NUMEVENTS%%", - Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND) - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - }; - } - - @AfterMethod - public void teardown() - { - try { - kinesisEventWriter.flush(); - indexer.shutdownSupervisor(supervisorId); - } - catch (Exception e) { - // Best effort cleanup as the supervisor may have already went Bye-Bye - } - try { - unloader(fullDatasourceName); - } - catch (Exception e) { - // Best effort cleanup as the datasource may have already went Bye-Bye - } - try { - kinesisAdminClient.deleteStream(streamName); - } - catch (Exception e) { - // Best effort cleanup as the stream may have already went Bye-Bye - } - } - - @Test - public void testKinesisIndexDataWithLegacyParserStableState() throws Exception - { - try ( - final Closeable ignored1 = unloader(fullDatasourceName) - ) { - final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER)); - LOG.info("supervisorSpec: [%s]\n", taskSpec); - // Start supervisor - supervisorId = indexer.submitSupervisor(taskSpec); - LOG.info("Submitted supervisor"); - // Start Kinesis data generator - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME); - verifyIngestedData(supervisorId); - } - } - - @Test - public void testKinesisIndexDataWithInputFormatStableState() throws Exception - { - try ( - final Closeable ignored1 = unloader(fullDatasourceName) - ) { - final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); - LOG.info("supervisorSpec: [%s]\n", taskSpec); - // Start supervisor - supervisorId = indexer.submitSupervisor(taskSpec); - LOG.info("Submitted supervisor"); - // Start Kinesis data generator - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME); - verifyIngestedData(supervisorId); - } - } - - @Test - public void testKinesisIndexDataWithLosingCoordinator() throws Exception - { - testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady()); - } - - @Test - public void testKinesisIndexDataWithLosingOverlord() throws Exception - { - testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady()); - } - - @Test - public void testKinesisIndexDataWithLosingHistorical() throws Exception - { - testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady()); - } - - @Test - public void testKinesisIndexDataWithStartStopSupervisor() throws Exception - { - try ( - final Closeable ignored1 = unloader(fullDatasourceName) - ) { - final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); - LOG.info("supervisorSpec: [%s]\n", taskSpec); - // Start supervisor - supervisorId = indexer.submitSupervisor(taskSpec); - LOG.info("Submitted supervisor"); - // Start generating half of the data - int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2; - secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); - // Verify supervisor is healthy before suspension - ITRetryUtil.retryUntil( - () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), - true, - 10000, - 30, - "Waiting for supervisor to be healthy" - ); - // Suspend the supervisor - indexer.suspendSupervisor(supervisorId); - // Start generating remainning half of the data - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); - // Resume the supervisor - indexer.resumeSupervisor(supervisorId); - // Verify supervisor is healthy after suspension - ITRetryUtil.retryUntil( - () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), - true, - 10000, - 30, - "Waiting for supervisor to be healthy" - ); - // Verify that supervisor can catch up with the stream - verifyIngestedData(supervisorId); - } - } - - @Test - public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception - { - // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2 - testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2); - } - - @Test - public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception - { - // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2 - testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2); - } - - private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception - { - try ( - final Closeable ignored1 = unloader(fullDatasourceName) - ) { - final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); - LOG.info("supervisorSpec: [%s]\n", taskSpec); - // Start supervisor - supervisorId = indexer.submitSupervisor(taskSpec); - LOG.info("Submitted supervisor"); - // Start generating one third of the data (before restarting) - int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3; - secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); - // Verify supervisor is healthy before restart - ITRetryUtil.retryUntil( - () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), - true, - 10000, - 30, - "Waiting for supervisor to be healthy" - ); - // Restart Druid process - LOG.info("Restarting Druid process"); - restartRunnable.run(); - LOG.info("Restarted Druid process"); - // Start generating one third of the data (while restarting) - int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3; - secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound; - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); - // Wait for Druid process to be available - LOG.info("Waiting for Druid process to be available"); - waitForReadyRunnable.run(); - LOG.info("Druid process is now available"); - // Start generating remainding data (after restarting) - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)); - // Verify supervisor is healthy - ITRetryUtil.retryUntil( - () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), - true, - 10000, - 30, - "Waiting for supervisor to be healthy" - ); - // Verify that supervisor ingested all data - verifyIngestedData(supervisorId); - } - } - - private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception - { - try ( - final Closeable ignored1 = unloader(fullDatasourceName) - ) { - final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); - LOG.info("supervisorSpec: [%s]\n", taskSpec); - // Start supervisor - supervisorId = indexer.submitSupervisor(taskSpec); - LOG.info("Submitted supervisor"); - // Start generating one third of the data (before resharding) - int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3; - secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); - // Verify supervisor is healthy before resahrding - ITRetryUtil.retryUntil( - () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), - true, - 10000, - 30, - "Waiting for supervisor to be healthy" - ); - // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount and waits until the resharding starts - kinesisAdminClient.updateShardCount(streamName, newShardCount, true); - // Start generating one third of the data (while resharding) - int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3; - secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound; - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); - // Wait for kinesis stream to finish resharding - ITRetryUtil.retryUntil( - () -> kinesisAdminClient.isStreamActive(streamName), - true, - 10000, - 30, - "Waiting for Kinesis stream to finish resharding" - ); - // Start generating remainding data (after resharding) - wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)); - // Verify supervisor is healthy after resahrding - ITRetryUtil.retryUntil( - () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), - true, - 10000, - 30, - "Waiting for supervisor to be healthy" - ); - // Verify that supervisor can catch up with the stream - verifyIngestedData(supervisorId); - } - } - - private void verifyIngestedData(String supervisorId) throws Exception - { - // Wait for supervisor to consume events - LOG.info("Waiting for [%s] millis for Kinesis indexing tasks to consume events", WAIT_TIME_MILLIS); - Thread.sleep(WAIT_TIME_MILLIS); - // Query data - final String querySpec = kinesisQueryPropsTransform.apply(getResourceAsString(QUERIES_FILE)); - // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing - this.queryHelper.testQueriesFromString(querySpec, 2); - LOG.info("Shutting down supervisor"); - indexer.shutdownSupervisor(supervisorId); - // wait for all Kinesis indexing tasks to finish - LOG.info("Waiting for all indexing tasks to finish"); - ITRetryUtil.retryUntilTrue( - () -> (indexer.getPendingTasks().size() - + indexer.getRunningTasks().size() - + indexer.getWaitingTasks().size()) == 0, - "Waiting for Tasks Completion" - ); - // wait for segments to be handed off - ITRetryUtil.retryUntil( - () -> coordinator.areSegmentsLoaded(fullDatasourceName), - true, - 10000, - 30, - "Real-time generated segments loaded" - ); - - // this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4 - this.queryHelper.testQueriesFromString(querySpec, 2); - } - private long getSumOfEventSequence(int numEvents) - { - return (numEvents * (1 + numEvents)) / 2; - } -} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java new file mode 100644 index 00000000000..199530e0a32 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.parallelized; + +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Test(groups = TestNGGroup.KAFKA_INDEX) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest +{ + @Override + protected boolean isKafkaWriterTransactionalEnabled() + { + return false; + } + + @Override + public String getTestNamePrefix() + { + return "kafka_non_transactional_parallelized"; + } + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + @AfterClass + public void tearDown() + { + doClassTeardown(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKafkaIndexDataWithLegacyParserStableState() throws Exception + { + doTestIndexDataWithLegacyParserStableState(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKafkaIndexDataWithInputFormatStableState() throws Exception + { + doTestIndexDataWithInputFormatStableState(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKafkaIndexDataWithStartStopSupervisor() throws Exception + { + doTestIndexDataWithStartStopSupervisor(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception + { + doTestIndexDataWithStreamReshardSplit(); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java new file mode 100644 index 00000000000..7db3a7fd832 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.parallelized; + +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKafkaIndexingServiceTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest +{ + @Override + protected boolean isKafkaWriterTransactionalEnabled() + { + return true; + } + + @Override + public String getTestNamePrefix() + { + return "kafka_transactional_parallelized"; + } + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + @AfterClass + public void tearDown() + { + doClassTeardown(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKafkaIndexDataWithLegacyParserStableState() throws Exception + { + doTestIndexDataWithLegacyParserStableState(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKafkaIndexDataWithInputFormatStableState() throws Exception + { + doTestIndexDataWithInputFormatStableState(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKafkaIndexDataWithStartStopSupervisor() throws Exception + { + doTestIndexDataWithStartStopSupervisor(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception + { + doTestIndexDataWithStreamReshardSplit(); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java new file mode 100644 index 00000000000..38816dc1328 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.parallelized; + +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Test(groups = TestNGGroup.KINESIS_INDEX) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisIndexingServiceTest +{ + @Override + public String getTestNamePrefix() + { + return "kinesis_parallelized"; + } + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + @AfterClass + public void tearDown() + { + doClassTeardown(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKinesisIndexDataWithLegacyParserStableState() throws Exception + { + doTestIndexDataWithLegacyParserStableState(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKinesisIndexDataWithInputFormatStableState() throws Exception + { + doTestIndexDataWithInputFormatStableState(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKinesisIndexDataWithStartStopSupervisor() throws Exception + { + doTestIndexDataWithStartStopSupervisor(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception + { + doTestIndexDataWithStreamReshardSplit(); + } + + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception + { + doTestIndexDataWithStreamReshardMerge(); + } +} diff --git a/integration-tests/src/test/resources/testng.xml b/integration-tests/src/test/resources/testng.xml index 333029c363c..5a0735a0578 100644 --- a/integration-tests/src/test/resources/testng.xml +++ b/integration-tests/src/test/resources/testng.xml @@ -21,14 +21,21 @@ - - - - - - - - - - + + + + + + + + + + + + + + + + + diff --git a/integration-tests/stop_cluster.sh b/integration-tests/stop_cluster.sh index 4ce4268806d..2828a0ff8a9 100755 --- a/integration-tests/stop_cluster.sh +++ b/integration-tests/stop_cluster.sh @@ -14,6 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Skip stopping docker if flag set (For use during development) +if [ -n "$DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER" == true ] + then + exit 0 + fi + for node in druid-historical druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop; do diff --git a/pom.xml b/pom.xml index 4ab46a8884f..6c98e55ff6e 100644 --- a/pom.xml +++ b/pom.xml @@ -1196,7 +1196,7 @@ org.testng testng - 6.8.7 + 6.14.3 org.hamcrest @@ -1253,7 +1253,6 @@ org/apache/druid/benchmark/**/* org/apache/druid/**/*Benchmark* - org/testng/DruidTestRunnerFactory* org/apache/druid/testing/**/*