mirror of https://github.com/apache/druid.git
Add integration tests for kafka ingestion (#9724)
* add kafka admin and kafka writer * refactor kinesis IT * fix typo refactor * parallel * parallel * parallel * parallel works now * add kafka it * add doc to readme * fix tests * fix failing test * test * test * test * test * address comments * addressed comments
This commit is contained in:
parent
479c290fb9
commit
16f5ae4405
33
.travis.yml
33
.travis.yml
|
@ -313,6 +313,30 @@ jobs:
|
|||
script: *run_integration_test
|
||||
after_failure: *integration_test_diags
|
||||
|
||||
- &integration_kafka_index_slow
|
||||
name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test slow"
|
||||
jdk: openjdk8
|
||||
services: *integration_test_services
|
||||
env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
|
||||
script: *run_integration_test
|
||||
after_failure: *integration_test_diags
|
||||
|
||||
- &integration_kafka_transactional_index
|
||||
name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test"
|
||||
jdk: openjdk8
|
||||
services: *integration_test_services
|
||||
env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8'
|
||||
script: *run_integration_test
|
||||
after_failure: *integration_test_diags
|
||||
|
||||
- &integration_kafka_transactional_index_slow
|
||||
name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test slow"
|
||||
jdk: openjdk8
|
||||
services: *integration_test_services
|
||||
env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
|
||||
script: *run_integration_test
|
||||
after_failure: *integration_test_diags
|
||||
|
||||
- &integration_query
|
||||
name: "(Compile=openjdk8, Run=openjdk8) query integration test"
|
||||
jdk: openjdk8
|
||||
|
@ -341,7 +365,7 @@ jobs:
|
|||
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
|
||||
jdk: openjdk8
|
||||
services: *integration_test_services
|
||||
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index' JVM_RUNTIME='-Djvm.runtime=8'
|
||||
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
|
||||
script: *run_integration_test
|
||||
after_failure: *integration_test_diags
|
||||
# END - Integration tests for Compile with Java 8 and Run with Java 8
|
||||
|
@ -357,11 +381,6 @@ jobs:
|
|||
jdk: openjdk8
|
||||
env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=11'
|
||||
|
||||
- <<: *integration_kafka_index
|
||||
name: "(Compile=openjdk8, Run=openjdk11) kafka index integration test"
|
||||
jdk: openjdk8
|
||||
env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=11'
|
||||
|
||||
- <<: *integration_query
|
||||
name: "(Compile=openjdk8, Run=openjdk11) query integration test"
|
||||
jdk: openjdk8
|
||||
|
@ -380,7 +399,7 @@ jobs:
|
|||
- <<: *integration_tests
|
||||
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
|
||||
jdk: openjdk8
|
||||
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index' JVM_RUNTIME='-Djvm.runtime=11'
|
||||
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=11'
|
||||
# END - Integration tests for Compile with Java 8 and Run with Java 11
|
||||
|
||||
- name: "security vulnerabilities"
|
||||
|
|
|
@ -68,7 +68,18 @@ can either be 8 or 11.
|
|||
Druid's configuration (using Docker) can be overrided by providing -Doverride.config.path=<PATH_TO_FILE>.
|
||||
The file must contain one property per line, the key must start with `druid_` and the format should be snake case.
|
||||
|
||||
## Debugging Druid while running tests
|
||||
## Tips & tricks for debugging and developing integration tests
|
||||
|
||||
### Useful mvn command flags
|
||||
|
||||
- -Dskip.start.docker=true to skip starting docker containers. This can save ~3 minutes by skipping building and bringing
|
||||
up the docker containers (Druid, Kafka, Hadoop, MYSQL, zookeeper, etc). Please make sure that you actually do have
|
||||
these containers already running if using this flag. Additionally, please make sure that the running containers
|
||||
are in the same state that the setup script (run_cluster.sh) would have brought it up in.
|
||||
- -Dskip.stop.docker=true to skip stopping and teardowning down the docker containers. This can be useful in further
|
||||
debugging after the integration tests have finish running.
|
||||
|
||||
### Debugging Druid while running tests
|
||||
|
||||
For your convenience, Druid processes running inside Docker have debugging enabled and the following ports have
|
||||
been made available to attach your remote debugger (such as via IntelliJ IDEA's Remote Configuration):
|
||||
|
@ -303,3 +314,13 @@ This will tell the test framework that the test class needs to be constructed us
|
|||
2) FromFileTestQueryHelper - reads queries with expected results from file and executes them and verifies the results using ResultVerifier
|
||||
|
||||
Refer ITIndexerTest as an example on how to use dependency Injection
|
||||
|
||||
### Running test methods in parallel
|
||||
By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test
|
||||
class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding
|
||||
the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests"
|
||||
test tag section in integration-tests/src/test/resources/testng.xml
|
||||
Please be mindful when adding tests to the "AllParallelizedTests" test tag that the tests can run in parallel with
|
||||
other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers,
|
||||
test does not use excessive memory starving other concurent task, test does not modify and/or use other task,
|
||||
supervisor, datasource it did not create.
|
||||
|
|
|
@ -233,11 +233,6 @@
|
|||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.101tec</groupId>
|
||||
<artifactId>zkclient</artifactId>
|
||||
<version>0.10</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.validation</groupId>
|
||||
<artifactId>validation-api</artifactId>
|
||||
|
@ -304,21 +299,6 @@
|
|||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.12</artifactId>
|
||||
<version>${apache.kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>log4j</artifactId>
|
||||
<groupId>log4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -367,6 +347,8 @@
|
|||
<id>integration-tests</id>
|
||||
<properties>
|
||||
<start.hadoop.docker>false</start.hadoop.docker>
|
||||
<skip.start.docker>false</skip.start.docker>
|
||||
<skip.stop.docker>false</skip.stop.docker>
|
||||
<override.config.path></override.config.path>
|
||||
<resource.file.dir.path></resource.file.dir.path>
|
||||
</properties>
|
||||
|
@ -385,6 +367,7 @@
|
|||
<configuration>
|
||||
<environmentVariables>
|
||||
<DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER>${start.hadoop.docker}</DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER>
|
||||
<DRUID_INTEGRATION_TEST_SKIP_START_DOCKER>${skip.start.docker}</DRUID_INTEGRATION_TEST_SKIP_START_DOCKER>
|
||||
<DRUID_INTEGRATION_TEST_JVM_RUNTIME>${jvm.runtime}</DRUID_INTEGRATION_TEST_JVM_RUNTIME>
|
||||
<DRUID_INTEGRATION_TEST_GROUP>${groups}</DRUID_INTEGRATION_TEST_GROUP>
|
||||
<DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH>${override.config.path}</DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH>
|
||||
|
@ -400,6 +383,9 @@
|
|||
</goals>
|
||||
<phase>post-integration-test</phase>
|
||||
<configuration>
|
||||
<environmentVariables>
|
||||
<DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER>${skip.stop.docker}</DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER>
|
||||
</environmentVariables>
|
||||
<executable>${project.basedir}/stop_cluster.sh</executable>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
@ -419,12 +405,6 @@
|
|||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<properties>
|
||||
<property>
|
||||
<name>testrunfactory</name>
|
||||
<value>org.testng.DruidTestRunnerFactory</value>
|
||||
</property>
|
||||
</properties>
|
||||
<argLine>
|
||||
-Duser.timezone=UTC
|
||||
-Dfile.encoding=UTF-8
|
||||
|
@ -477,12 +457,6 @@
|
|||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<properties>
|
||||
<property>
|
||||
<name>testrunfactory</name>
|
||||
<value>org.testng.DruidTestRunnerFactory</value>
|
||||
</property>
|
||||
</properties>
|
||||
<argLine>
|
||||
-Duser.timezone=UTC
|
||||
-Dfile.encoding=UTF-8
|
||||
|
|
|
@ -14,6 +14,12 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Skip starting docker if flag set (For use during development)
|
||||
if [ -n "$DRUID_INTEGRATION_TEST_SKIP_START_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_START_DOCKER" == true ]
|
||||
then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Cleanup old images/containers
|
||||
{
|
||||
for node in druid-historical druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
|
|||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -156,6 +157,15 @@ public class OverlordResourceTestClient
|
|||
return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource)));
|
||||
}
|
||||
|
||||
public List<TaskResponseObject> getUncompletedTasksForDataSource(final String dataSource)
|
||||
{
|
||||
List<TaskResponseObject> uncompletedTasks = new ArrayList<>();
|
||||
uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=pending&datasource=%s", StringUtils.urlEncode(dataSource))));
|
||||
uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=running&datasource=%s", StringUtils.urlEncode(dataSource))));
|
||||
uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=waiting&datasource=%s", StringUtils.urlEncode(dataSource))));
|
||||
return uncompletedTasks;
|
||||
}
|
||||
|
||||
private List<TaskResponseObject> getTasks(String identifier)
|
||||
{
|
||||
try {
|
||||
|
|
|
@ -57,7 +57,6 @@ public class DruidTestModuleFactory implements IModuleFactory
|
|||
@Override
|
||||
public Module createModule(ITestContext context, Class<?> testClass)
|
||||
{
|
||||
context.addGuiceModule(DruidTestModule.class, MODULE);
|
||||
context.addInjector(Collections.singletonList(MODULE), INJECTOR);
|
||||
return MODULE;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.admin.CreatePartitionsResult;
|
||||
import org.apache.kafka.clients.admin.CreateTopicsResult;
|
||||
import org.apache.kafka.clients.admin.DeleteTopicsResult;
|
||||
import org.apache.kafka.clients.admin.DescribeTopicsResult;
|
||||
import org.apache.kafka.clients.admin.NewPartitions;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
public class KafkaAdminClient implements StreamAdminClient
|
||||
{
|
||||
private AdminClient adminClient;
|
||||
|
||||
public KafkaAdminClient(String kafkaInternalHost)
|
||||
{
|
||||
Properties config = new Properties();
|
||||
config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInternalHost);
|
||||
adminClient = AdminClient.create(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createStream(String streamName, int partitionCount, Map<String, String> tags) throws Exception
|
||||
{
|
||||
final short replicationFactor = 1;
|
||||
final NewTopic newTopic = new NewTopic(streamName, partitionCount, replicationFactor);
|
||||
final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
|
||||
// Wait for create topic to compelte
|
||||
createTopicsResult.values().get(streamName).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteStream(String streamName) throws Exception
|
||||
{
|
||||
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(ImmutableList.of(streamName));
|
||||
deleteTopicsResult.values().get(streamName).get();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method can only increase the partition count of {@param streamName} to have a final partition
|
||||
* count of {@param newPartitionCount}
|
||||
* If {@param blocksUntilStarted} is set to true, then this method will blocks until the partitioning
|
||||
* started (but not nessesary finished), otherwise, the method will returns right after issue the
|
||||
* repartitioning command
|
||||
*/
|
||||
@Override
|
||||
public void updatePartitionCount(String streamName, int newPartitionCount, boolean blocksUntilStarted) throws Exception
|
||||
{
|
||||
Map<String, NewPartitions> counts = new HashMap<>();
|
||||
counts.put(streamName, NewPartitions.increaseTo(newPartitionCount));
|
||||
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(counts);
|
||||
if (blocksUntilStarted) {
|
||||
createPartitionsResult.values().get(streamName).get();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream state such as active/non-active does not applies to Kafka.
|
||||
* Returning true since Kafka stream is always active and can always be writen and read to.
|
||||
*/
|
||||
@Override
|
||||
public boolean isStreamActive(String streamName)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStreamPartitionCount(String streamName) throws Exception
|
||||
{
|
||||
DescribeTopicsResult result = adminClient.describeTopics(ImmutableList.of(streamName));
|
||||
TopicDescription topicDescription = result.values().get(streamName).get();
|
||||
return topicDescription.partitions().size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean verfiyPartitionCountUpdated(String streamName, int oldPartitionCount, int newPartitionCount) throws Exception
|
||||
{
|
||||
return getStreamPartitionCount(streamName) == newPartitionCount;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
import org.apache.druid.indexer.TaskIdUtils;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class KafkaEventWriter implements StreamEventWriter
|
||||
{
|
||||
private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
|
||||
private final KafkaProducer<String, String> producer;
|
||||
private final boolean txnEnabled;
|
||||
private final List<Future<RecordMetadata>> pendingWriteRecords = new ArrayList<>();
|
||||
|
||||
public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled)
|
||||
{
|
||||
Properties properties = new Properties();
|
||||
addFilteredProperties(config, properties);
|
||||
properties.setProperty("bootstrap.servers", config.getKafkaHost());
|
||||
properties.setProperty("acks", "all");
|
||||
properties.setProperty("retries", "3");
|
||||
properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
|
||||
properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
|
||||
this.txnEnabled = txnEnabled;
|
||||
if (txnEnabled) {
|
||||
properties.setProperty("enable.idempotence", "true");
|
||||
properties.setProperty("transactional.id", TaskIdUtils.getRandomId());
|
||||
}
|
||||
this.producer = new KafkaProducer<>(
|
||||
properties,
|
||||
new StringSerializer(),
|
||||
new StringSerializer()
|
||||
);
|
||||
if (txnEnabled) {
|
||||
producer.initTransactions();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTransactionEnabled()
|
||||
{
|
||||
return txnEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initTransaction()
|
||||
{
|
||||
if (txnEnabled) {
|
||||
producer.beginTransaction();
|
||||
} else {
|
||||
throw new IllegalStateException("Kafka writer was initialized with transaction disabled");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitTransaction()
|
||||
{
|
||||
if (txnEnabled) {
|
||||
producer.commitTransaction();
|
||||
} else {
|
||||
throw new IllegalStateException("Kafka writer was initialized with transaction disabled");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(String topic, String event)
|
||||
{
|
||||
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, event));
|
||||
pendingWriteRecords.add(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown()
|
||||
{
|
||||
producer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws Exception
|
||||
{
|
||||
for (Future<RecordMetadata> future : pendingWriteRecords) {
|
||||
future.get();
|
||||
}
|
||||
pendingWriteRecords.clear();
|
||||
}
|
||||
|
||||
private void addFilteredProperties(IntegrationTestingConfig config, Properties properties)
|
||||
{
|
||||
for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
|
||||
if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) {
|
||||
properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -42,7 +42,7 @@ import java.io.FileInputStream;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
public class KinesisAdminClient
|
||||
public class KinesisAdminClient implements StreamAdminClient
|
||||
{
|
||||
private AmazonKinesis amazonKinesis;
|
||||
|
||||
|
@ -70,6 +70,7 @@ public class KinesisAdminClient
|
|||
)).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createStream(String streamName, int shardCount, Map<String, String> tags)
|
||||
{
|
||||
CreateStreamResult createStreamResult = amazonKinesis.createStream(streamName, shardCount);
|
||||
|
@ -88,6 +89,7 @@ public class KinesisAdminClient
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteStream(String streamName)
|
||||
{
|
||||
DeleteStreamResult deleteStreamResult = amazonKinesis.deleteStream(streamName);
|
||||
|
@ -101,9 +103,10 @@ public class KinesisAdminClient
|
|||
* If {@param blocksUntilStarted} is set to true, then this method will blocks until the resharding
|
||||
* started (but not nessesary finished), otherwise, the method will returns right after issue the reshard command
|
||||
*/
|
||||
public void updateShardCount(String streamName, int newShardCount, boolean blocksUntilStarted)
|
||||
@Override
|
||||
public void updatePartitionCount(String streamName, int newShardCount, boolean blocksUntilStarted)
|
||||
{
|
||||
int originalShardCount = getStreamShardCount(streamName);
|
||||
int originalShardCount = getStreamPartitionCount(streamName);
|
||||
UpdateShardCountRequest updateShardCountRequest = new UpdateShardCountRequest();
|
||||
updateShardCountRequest.setStreamName(streamName);
|
||||
updateShardCountRequest.setTargetShardCount(newShardCount);
|
||||
|
@ -129,18 +132,31 @@ public class KinesisAdminClient
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStreamActive(String streamName)
|
||||
{
|
||||
StreamDescription streamDescription = getStreamDescription(streamName);
|
||||
return verifyStreamStatus(streamDescription, StreamStatus.ACTIVE);
|
||||
}
|
||||
|
||||
public int getStreamShardCount(String streamName)
|
||||
@Override
|
||||
public int getStreamPartitionCount(String streamName)
|
||||
{
|
||||
StreamDescription streamDescription = getStreamDescription(streamName);
|
||||
return getStreamShardCount(streamDescription);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean verfiyPartitionCountUpdated(String streamName, int oldShardCount, int newShardCount)
|
||||
{
|
||||
int actualShardCount = getStreamPartitionCount(streamName);
|
||||
// Kinesis does not immediately drop the old shards after the resharding and hence,
|
||||
// would still returns both open shards and closed shards from the API call.
|
||||
// To verify, we sum the old count (closed shareds) and the expected new count (open shards)
|
||||
return actualShardCount == oldShardCount + newShardCount;
|
||||
}
|
||||
|
||||
|
||||
private boolean verifyStreamStatus(StreamDescription streamDescription, StreamStatus streamStatusToCheck)
|
||||
{
|
||||
return streamStatusToCheck.toString().equals(streamDescription.getStreamStatus());
|
||||
|
|
|
@ -63,6 +63,24 @@ public class KinesisEventWriter implements StreamEventWriter
|
|||
this.kinesisProducer = new KinesisProducer(kinesisProducerConfiguration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTransactionEnabled()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initTransaction()
|
||||
{
|
||||
// No-Op as Kinesis does not support transaction
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitTransaction()
|
||||
{
|
||||
// No-Op as Kinesis does not support transaction
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(String streamName, String event)
|
||||
{
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This interface provides the administrative client contract for any stream storage (such as Kafka and Kinesis)
|
||||
* which supports managing and inspecting streams (aka topics) and stream's partitions (aka shards).
|
||||
* This is used for setting up, tearing down and any other administrative changes required in integration tests.
|
||||
* Each method resulting in a change of state for the stream is intended to be synchronous to help
|
||||
* make integration tests deterministic and easy to write.
|
||||
*/
|
||||
public interface StreamAdminClient
|
||||
{
|
||||
void createStream(String streamName, int partitionCount, Map<String, String> tags) throws Exception;
|
||||
|
||||
void deleteStream(String streamName) throws Exception;
|
||||
|
||||
void updatePartitionCount(String streamName, int newPartitionCount, boolean blocksUntilStarted) throws Exception;
|
||||
|
||||
boolean isStreamActive(String streamName);
|
||||
|
||||
int getStreamPartitionCount(String streamName) throws Exception;
|
||||
|
||||
boolean verfiyPartitionCountUpdated(String streamName, int oldPartitionCount, int newPartitionCount) throws Exception;
|
||||
}
|
|
@ -19,11 +19,22 @@
|
|||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
|
||||
/**
|
||||
* This interface is use to write test event data to the underlying stream (such as Kafka, Kinesis)
|
||||
* This can also be use with {@link StreamGenerator} to write particular set of test data
|
||||
*/
|
||||
public interface StreamEventWriter
|
||||
{
|
||||
void write(String topic, String event);
|
||||
|
||||
void shutdown();
|
||||
|
||||
void flush();
|
||||
void flush() throws Exception;
|
||||
|
||||
boolean isTransactionEnabled();
|
||||
|
||||
void initTransaction();
|
||||
|
||||
void commitTransaction();
|
||||
}
|
||||
|
|
|
@ -23,9 +23,9 @@ import org.joda.time.DateTime;
|
|||
|
||||
public interface StreamGenerator
|
||||
{
|
||||
void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds);
|
||||
void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds);
|
||||
|
||||
void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime);
|
||||
void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime);
|
||||
|
||||
void shutdown();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testing.utils;
|
||||
|
||||
import com.google.inject.Injector;
|
||||
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.testng.ISuite;
|
||||
import org.testng.ISuiteListener;
|
||||
|
||||
public class SuiteListener implements ISuiteListener
|
||||
{
|
||||
private static final Logger LOG = new Logger(SuiteListener.class);
|
||||
|
||||
@Override
|
||||
public void onStart(ISuite suite)
|
||||
{
|
||||
Injector injector = DruidTestModuleFactory.getInjector();
|
||||
IntegrationTestingConfig config = injector.getInstance(IntegrationTestingConfig.class);
|
||||
DruidClusterAdminClient druidClusterAdminClient = injector.getInstance(DruidClusterAdminClient.class);
|
||||
|
||||
druidClusterAdminClient.waitUntilCoordinatorReady();
|
||||
druidClusterAdminClient.waitUntilIndexerReady();
|
||||
druidClusterAdminClient.waitUntilBrokerReady();
|
||||
String routerHost = config.getRouterUrl();
|
||||
if (null != routerHost) {
|
||||
druidClusterAdminClient.waitUntilRouterReady();
|
||||
}
|
||||
Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
|
||||
try {
|
||||
lifecycle.start();
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error(e, "");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinish(ISuite suite)
|
||||
{
|
||||
Injector injector = DruidTestModuleFactory.getInjector();
|
||||
Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
|
||||
lifecycle.stop();
|
||||
}
|
||||
}
|
|
@ -61,13 +61,13 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
|
|||
abstract Object getEvent(int row, DateTime timestamp);
|
||||
|
||||
@Override
|
||||
public void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds)
|
||||
public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds)
|
||||
{
|
||||
start(streamTopic, streamEventWriter, totalNumberOfSeconds, null);
|
||||
run(streamTopic, streamEventWriter, totalNumberOfSeconds, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime)
|
||||
public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime)
|
||||
{
|
||||
// The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond]
|
||||
// or the [overrrideFirstEventTime] as the primary timestamp.
|
||||
|
@ -94,6 +94,10 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
|
|||
nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs)
|
||||
);
|
||||
|
||||
if (streamEventWriter.isTransactionEnabled()) {
|
||||
streamEventWriter.initTransaction();
|
||||
}
|
||||
|
||||
for (int i = 1; i <= eventsPerSecond; i++) {
|
||||
streamEventWriter.write(streamTopic, MAPPER.writeValueAsString(getEvent(i, eventTimestamp)));
|
||||
|
||||
|
@ -107,6 +111,10 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
|
|||
}
|
||||
}
|
||||
|
||||
if (streamEventWriter.isTransactionEnabled()) {
|
||||
streamEventWriter.commitTransaction();
|
||||
}
|
||||
|
||||
nowCeilingToSecond = nowCeilingToSecond.plusSeconds(1);
|
||||
eventTimestamp = eventTimestamp.plusSeconds(1);
|
||||
seconds++;
|
||||
|
|
|
@ -1,109 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package /*CHECKSTYLE.OFF: PackageName*/org.testng/*CHECKSTYLE.ON: PackageName*/;
|
||||
|
||||
import com.google.inject.Injector;
|
||||
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.testing.utils.DruidClusterAdminClient;
|
||||
import org.testng.internal.IConfiguration;
|
||||
import org.testng.internal.annotations.IAnnotationFinder;
|
||||
import org.testng.xml.XmlTest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This class must be in package org.testng to access protected methods like TestNG.getDefault().getConfiguration()
|
||||
*/
|
||||
public class DruidTestRunnerFactory implements ITestRunnerFactory
|
||||
{
|
||||
private static final Logger LOG = new Logger(DruidTestRunnerFactory.class);
|
||||
|
||||
@Override
|
||||
public TestRunner newTestRunner(ISuite suite, XmlTest test, List<IInvokedMethodListener> listeners)
|
||||
{
|
||||
IConfiguration configuration = TestNG.getDefault().getConfiguration();
|
||||
String outputDirectory = suite.getOutputDirectory();
|
||||
IAnnotationFinder annotationFinder = configuration.getAnnotationFinder();
|
||||
Boolean skipFailedInvocationCounts = suite.getXmlSuite().skipFailedInvocationCounts();
|
||||
return new DruidTestRunner(
|
||||
configuration,
|
||||
suite,
|
||||
test,
|
||||
outputDirectory,
|
||||
annotationFinder,
|
||||
skipFailedInvocationCounts,
|
||||
listeners
|
||||
);
|
||||
}
|
||||
|
||||
private static class DruidTestRunner extends TestRunner
|
||||
{
|
||||
|
||||
protected DruidTestRunner(
|
||||
IConfiguration configuration,
|
||||
ISuite suite,
|
||||
XmlTest test,
|
||||
String outputDirectory,
|
||||
IAnnotationFinder finder,
|
||||
boolean skipFailedInvocationCounts,
|
||||
List<IInvokedMethodListener> invokedMethodListeners
|
||||
)
|
||||
{
|
||||
super(configuration, suite, test, outputDirectory, finder, skipFailedInvocationCounts, invokedMethodListeners);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
Injector injector = DruidTestModuleFactory.getInjector();
|
||||
IntegrationTestingConfig config = injector.getInstance(IntegrationTestingConfig.class);
|
||||
DruidClusterAdminClient druidClusterAdminClient = injector.getInstance(DruidClusterAdminClient.class);
|
||||
|
||||
druidClusterAdminClient.waitUntilCoordinatorReady();
|
||||
druidClusterAdminClient.waitUntilIndexerReady();
|
||||
druidClusterAdminClient.waitUntilBrokerReady();
|
||||
String routerHost = config.getRouterUrl();
|
||||
if (null != routerHost) {
|
||||
druidClusterAdminClient.waitUntilRouterReady();
|
||||
}
|
||||
Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
|
||||
try {
|
||||
lifecycle.start();
|
||||
runTests();
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error(e, "");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
finally {
|
||||
lifecycle.stop();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void runTests()
|
||||
{
|
||||
super.run();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -31,6 +31,12 @@ public class TestNGGroup
|
|||
|
||||
public static final String KAFKA_INDEX = "kafka-index";
|
||||
|
||||
public static final String KAFKA_INDEX_SLOW = "kafka-index-slow";
|
||||
|
||||
public static final String TRANSACTIONAL_KAFKA_INDEX = "kafka-transactional-index";
|
||||
|
||||
public static final String TRANSACTIONAL_KAFKA_INDEX_SLOW = "kafka-transactional-index-slow";
|
||||
|
||||
public static final String OTHER_INDEX = "other-index";
|
||||
|
||||
public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";
|
||||
|
|
|
@ -243,6 +243,9 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
null);
|
||||
compactionResource.submitCompactionConfig(compactionConfig);
|
||||
|
||||
// Wait for compaction config to persist
|
||||
Thread.sleep(2000);
|
||||
|
||||
// Verify that the compaction config is updated correctly.
|
||||
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
|
||||
DataSourceCompactionConfig foundDataSourceCompactionConfig = null;
|
||||
|
|
|
@ -1,317 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import kafka.admin.AdminUtils;
|
||||
import kafka.admin.RackAwareMode;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.I0Itec.zkclient.ZkConnection;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.druid.indexer.TaskIdUtils;
|
||||
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.utils.ITRetryUtil;
|
||||
import org.apache.druid.testing.utils.TestQueryHelper;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest
|
||||
{
|
||||
private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
|
||||
protected static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
|
||||
protected static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
|
||||
private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
|
||||
private static final String TOPIC_NAME = "kafka_indexing_service_topic";
|
||||
|
||||
private static final int NUM_EVENTS_TO_SEND = 60;
|
||||
private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;
|
||||
private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
|
||||
|
||||
// We'll fill in the current time and numbers for added, deleted and changed
|
||||
// before sending the event.
|
||||
private static final String EVENT_TEMPLATE =
|
||||
"{\"timestamp\": \"%s\"," +
|
||||
"\"page\": \"Gypsy Danger\"," +
|
||||
"\"language\" : \"en\"," +
|
||||
"\"user\" : \"nuclear\"," +
|
||||
"\"unpatrolled\" : \"true\"," +
|
||||
"\"newPage\" : \"true\"," +
|
||||
"\"robot\": \"false\"," +
|
||||
"\"anonymous\": \"false\"," +
|
||||
"\"namespace\":\"article\"," +
|
||||
"\"continent\":\"North America\"," +
|
||||
"\"country\":\"United States\"," +
|
||||
"\"region\":\"Bay Area\"," +
|
||||
"\"city\":\"San Francisco\"," +
|
||||
"\"added\":%d," +
|
||||
"\"deleted\":%d," +
|
||||
"\"delta\":%d}";
|
||||
|
||||
private ZkUtils zkUtils;
|
||||
private boolean segmentsExist; // to tell if we should remove segments during teardown
|
||||
|
||||
// format for the querying interval
|
||||
private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
|
||||
// format for the expected timestamp in a query response
|
||||
private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
|
||||
|
||||
@Inject
|
||||
private TestQueryHelper queryHelper;
|
||||
@Inject
|
||||
private IntegrationTestingConfig config;
|
||||
|
||||
private String fullDatasourceName;
|
||||
|
||||
void doKafkaIndexTest(String dataSourceName, String supervisorSpecPath, boolean txnEnabled)
|
||||
{
|
||||
fullDatasourceName = dataSourceName + config.getExtraDatasourceNameSuffix();
|
||||
// create topic
|
||||
try {
|
||||
int sessionTimeoutMs = 10000;
|
||||
int connectionTimeoutMs = 10000;
|
||||
String zkHosts = config.getZookeeperHosts();
|
||||
ZkClient zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
|
||||
zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false);
|
||||
if (config.manageKafkaTopic()) {
|
||||
int numPartitions = 4;
|
||||
int replicationFactor = 1;
|
||||
Properties topicConfig = new Properties();
|
||||
AdminUtils.createTopic(
|
||||
zkUtils,
|
||||
TOPIC_NAME,
|
||||
numPartitions,
|
||||
replicationFactor,
|
||||
topicConfig,
|
||||
RackAwareMode.Disabled$.MODULE$
|
||||
);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ISE(e, "could not create kafka topic");
|
||||
}
|
||||
|
||||
String spec;
|
||||
try {
|
||||
LOG.info("supervisorSpec name: [%s]", supervisorSpecPath);
|
||||
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
|
||||
final Properties consumerProperties = new Properties();
|
||||
consumerProperties.putAll(consumerConfigs);
|
||||
consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
|
||||
|
||||
spec = getResourceAsString(supervisorSpecPath);
|
||||
spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
|
||||
spec = StringUtils.replace(spec, "%%STREAM_TYPE%%", "kafka");
|
||||
spec = StringUtils.replace(spec, "%%TOPIC_KEY%%", "topic");
|
||||
spec = StringUtils.replace(spec, "%%TOPIC_VALUE%%", TOPIC_NAME);
|
||||
spec = StringUtils.replace(spec, "%%USE_EARLIEST_KEY%%", "useEarliestOffset");
|
||||
spec = StringUtils.replace(spec, "%%STREAM_PROPERTIES_KEY%%", "consumerProperties");
|
||||
spec = StringUtils.replace(spec, "%%STREAM_PROPERTIES_VALUE%%", jsonMapper.writeValueAsString(consumerProperties));
|
||||
LOG.info("supervisorSpec: [%s]\n", spec);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error("could not read file [%s]", supervisorSpecPath);
|
||||
throw new ISE(e, "could not read file [%s]", supervisorSpecPath);
|
||||
}
|
||||
|
||||
// start supervisor
|
||||
String supervisorId = indexer.submitSupervisor(spec);
|
||||
LOG.info("Submitted supervisor");
|
||||
|
||||
// set up kafka producer
|
||||
Properties properties = new Properties();
|
||||
addFilteredProperties(config, properties);
|
||||
properties.setProperty("bootstrap.servers", config.getKafkaHost());
|
||||
LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
|
||||
properties.setProperty("acks", "all");
|
||||
properties.setProperty("retries", "3");
|
||||
properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
|
||||
properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
|
||||
if (txnEnabled) {
|
||||
properties.setProperty("enable.idempotence", "true");
|
||||
properties.setProperty("transactional.id", TaskIdUtils.getRandomId());
|
||||
}
|
||||
|
||||
KafkaProducer<String, String> producer = new KafkaProducer<>(
|
||||
properties,
|
||||
new StringSerializer(),
|
||||
new StringSerializer()
|
||||
);
|
||||
|
||||
DateTimeZone zone = DateTimes.inferTzFromString("UTC");
|
||||
// format for putting into events
|
||||
DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
|
||||
|
||||
DateTime dt = new DateTime(zone); // timestamp to put on events
|
||||
DateTime dtFirst = dt; // timestamp of 1st event
|
||||
DateTime dtLast = dt; // timestamp of last event
|
||||
|
||||
// these are used to compute the expected aggregations
|
||||
int added = 0;
|
||||
int num_events = 0;
|
||||
|
||||
// send data to kafka
|
||||
if (txnEnabled) {
|
||||
producer.initTransactions();
|
||||
producer.beginTransaction();
|
||||
}
|
||||
while (num_events < NUM_EVENTS_TO_SEND) {
|
||||
num_events++;
|
||||
added += num_events;
|
||||
// construct the event to send
|
||||
String event = StringUtils.format(EVENT_TEMPLATE, event_fmt.print(dt), num_events, 0, num_events);
|
||||
LOG.info("sending event: [%s]", event);
|
||||
try {
|
||||
|
||||
producer.send(new ProducerRecord<>(TOPIC_NAME, event)).get();
|
||||
|
||||
}
|
||||
catch (Exception ioe) {
|
||||
throw Throwables.propagate(ioe);
|
||||
}
|
||||
|
||||
dtLast = dt;
|
||||
dt = new DateTime(zone);
|
||||
}
|
||||
if (txnEnabled) {
|
||||
producer.commitTransaction();
|
||||
}
|
||||
producer.close();
|
||||
|
||||
LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
|
||||
try {
|
||||
Thread.sleep(WAIT_TIME_MILLIS);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
InputStream is = AbstractKafkaIndexerTest.class.getResourceAsStream(QUERIES_FILE);
|
||||
if (null == is) {
|
||||
throw new ISE("could not open query file: %s", QUERIES_FILE);
|
||||
}
|
||||
|
||||
// put the timestamps into the query structure
|
||||
String query_response_template;
|
||||
try {
|
||||
query_response_template = IOUtils.toString(is, StandardCharsets.UTF_8);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new ISE(e, "could not read query file: %s", QUERIES_FILE);
|
||||
}
|
||||
|
||||
String queryStr = query_response_template;
|
||||
queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
|
||||
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
|
||||
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
|
||||
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst));
|
||||
queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst));
|
||||
queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)));
|
||||
queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
|
||||
queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_ADDED%%", Integer.toString(added));
|
||||
queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
|
||||
|
||||
// this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
|
||||
try {
|
||||
this.queryHelper.testQueriesFromString(queryStr, 2);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
LOG.info("Shutting down Kafka Supervisor");
|
||||
indexer.shutdownSupervisor(supervisorId);
|
||||
|
||||
// wait for all kafka indexing tasks to finish
|
||||
LOG.info("Waiting for all kafka indexing tasks to finish");
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> (indexer.getPendingTasks().size()
|
||||
+ indexer.getRunningTasks().size()
|
||||
+ indexer.getWaitingTasks().size()) == 0,
|
||||
"Waiting for Tasks Completion"
|
||||
);
|
||||
|
||||
// wait for segments to be handed off
|
||||
try {
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Real-time generated segments loaded"
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
LOG.info("segments are present");
|
||||
segmentsExist = true;
|
||||
|
||||
// this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4
|
||||
try {
|
||||
this.queryHelper.testQueriesFromString(queryStr, 2);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void addFilteredProperties(IntegrationTestingConfig config, Properties properties)
|
||||
{
|
||||
for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
|
||||
if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) {
|
||||
properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void doTearDown()
|
||||
{
|
||||
if (config.manageKafkaTopic()) {
|
||||
// delete kafka topic
|
||||
AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
|
||||
}
|
||||
|
||||
// remove segments
|
||||
if (segmentsExist && fullDatasourceName != null) {
|
||||
unloadAndKillData(fullDatasourceName);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.utils.KafkaAdminClient;
|
||||
import org.apache.druid.testing.utils.KafkaEventWriter;
|
||||
import org.apache.druid.testing.utils.StreamAdminClient;
|
||||
import org.apache.druid.testing.utils.StreamEventWriter;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Function;
|
||||
|
||||
public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamIndexingTest
|
||||
{
|
||||
protected abstract boolean isKafkaWriterTransactionalEnabled();
|
||||
|
||||
@Override
|
||||
StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config)
|
||||
{
|
||||
return new KafkaAdminClient(config.getKafkaHost());
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config)
|
||||
{
|
||||
return new KafkaEventWriter(config, isKafkaWriterTransactionalEnabled());
|
||||
}
|
||||
|
||||
@Override
|
||||
Function<String, String> generateStreamIngestionPropsTransform(String streamName,
|
||||
String fullDatasourceName,
|
||||
IntegrationTestingConfig config)
|
||||
{
|
||||
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
|
||||
final Properties consumerProperties = new Properties();
|
||||
consumerProperties.putAll(consumerConfigs);
|
||||
consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
|
||||
return spec -> {
|
||||
try {
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%DATASOURCE%%",
|
||||
fullDatasourceName
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%STREAM_TYPE%%",
|
||||
"kafka"
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TOPIC_KEY%%",
|
||||
"topic"
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TOPIC_VALUE%%",
|
||||
streamName
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%USE_EARLIEST_KEY%%",
|
||||
"useEarliestOffset"
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%STREAM_PROPERTIES_KEY%%",
|
||||
"consumerProperties"
|
||||
);
|
||||
return StringUtils.replace(
|
||||
spec,
|
||||
"%%STREAM_PROPERTIES_VALUE%%",
|
||||
jsonMapper.writeValueAsString(consumerProperties)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName)
|
||||
{
|
||||
return spec -> {
|
||||
try {
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%DATASOURCE%%",
|
||||
fullDatasourceName
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_QUERY_START%%",
|
||||
INTERVAL_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_QUERY_END%%",
|
||||
INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_RESPONSE_TIMESTAMP%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_ADDED%%",
|
||||
Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
|
||||
);
|
||||
return StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_NUMEVENTS%%",
|
||||
Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,150 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.utils.KinesisAdminClient;
|
||||
import org.apache.druid.testing.utils.KinesisEventWriter;
|
||||
import org.apache.druid.testing.utils.StreamAdminClient;
|
||||
import org.apache.druid.testing.utils.StreamEventWriter;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamIndexingTest
|
||||
{
|
||||
@Override
|
||||
StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception
|
||||
{
|
||||
return new KinesisAdminClient(config.getStreamEndpoint());
|
||||
}
|
||||
|
||||
@Override
|
||||
StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception
|
||||
{
|
||||
return new KinesisEventWriter(config.getStreamEndpoint(), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
Function<String, String> generateStreamIngestionPropsTransform(String streamName,
|
||||
String fullDatasourceName,
|
||||
IntegrationTestingConfig config)
|
||||
{
|
||||
return spec -> {
|
||||
try {
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%DATASOURCE%%",
|
||||
fullDatasourceName
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%STREAM_TYPE%%",
|
||||
"kinesis"
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TOPIC_KEY%%",
|
||||
"stream"
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TOPIC_VALUE%%",
|
||||
streamName
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%USE_EARLIEST_KEY%%",
|
||||
"useEarliestSequenceNumber"
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%STREAM_PROPERTIES_KEY%%",
|
||||
"endpoint"
|
||||
);
|
||||
return StringUtils.replace(
|
||||
spec,
|
||||
"%%STREAM_PROPERTIES_VALUE%%",
|
||||
jsonMapper.writeValueAsString(config.getStreamEndpoint())
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName)
|
||||
{
|
||||
return spec -> {
|
||||
try {
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%DATASOURCE%%",
|
||||
fullDatasourceName
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_QUERY_START%%",
|
||||
INTERVAL_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_QUERY_END%%",
|
||||
INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_RESPONSE_TIMESTAMP%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_ADDED%%",
|
||||
Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
|
||||
);
|
||||
return StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_NUMEVENTS%%",
|
||||
Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,450 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.utils.DruidClusterAdminClient;
|
||||
import org.apache.druid.testing.utils.ITRetryUtil;
|
||||
import org.apache.druid.testing.utils.StreamAdminClient;
|
||||
import org.apache.druid.testing.utils.StreamEventWriter;
|
||||
import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
|
||||
public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
|
||||
{
|
||||
static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
|
||||
// format for the querying interval
|
||||
static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
|
||||
// format for the expected timestamp in a query response
|
||||
static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
|
||||
static final int EVENTS_PER_SECOND = 6;
|
||||
static final int TOTAL_NUMBER_OF_SECOND = 10;
|
||||
// Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created
|
||||
// to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method)
|
||||
// The value to this tag is a timestamp that can be used by a lambda function to remove unused stream.
|
||||
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
|
||||
private static final int STREAM_SHARD_COUNT = 2;
|
||||
private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
|
||||
private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
|
||||
private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
|
||||
private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
|
||||
private static final long CYCLE_PADDING_MS = 100;
|
||||
private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class);
|
||||
|
||||
@Inject
|
||||
private DruidClusterAdminClient druidClusterAdminClient;
|
||||
|
||||
@Inject
|
||||
private IntegrationTestingConfig config;
|
||||
|
||||
private StreamAdminClient streamAdminClient;
|
||||
private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
|
||||
|
||||
abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
|
||||
abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
|
||||
abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
|
||||
String fullDatasourceName,
|
||||
IntegrationTestingConfig config);
|
||||
abstract Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName);
|
||||
public abstract String getTestNamePrefix();
|
||||
|
||||
protected void doBeforeClass() throws Exception
|
||||
{
|
||||
streamAdminClient = createStreamAdminClient(config);
|
||||
wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
|
||||
}
|
||||
|
||||
protected void doClassTeardown()
|
||||
{
|
||||
wikipediaStreamEventGenerator.shutdown();
|
||||
}
|
||||
|
||||
protected void doTestIndexDataWithLegacyParserStableState() throws Exception
|
||||
{
|
||||
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
|
||||
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
|
||||
try (
|
||||
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
|
||||
) {
|
||||
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start data generator
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
|
||||
verifyIngestedData(generatedTestConfig);
|
||||
}
|
||||
finally {
|
||||
doMethodTeardown(generatedTestConfig, streamEventWriter);
|
||||
}
|
||||
}
|
||||
|
||||
protected void doTestIndexDataWithInputFormatStableState() throws Exception
|
||||
{
|
||||
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
|
||||
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
|
||||
try (
|
||||
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
|
||||
) {
|
||||
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start data generator
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
|
||||
verifyIngestedData(generatedTestConfig);
|
||||
}
|
||||
finally {
|
||||
doMethodTeardown(generatedTestConfig, streamEventWriter);
|
||||
}
|
||||
}
|
||||
|
||||
void doTestIndexDataWithLosingCoordinator() throws Exception
|
||||
{
|
||||
testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
|
||||
}
|
||||
|
||||
void doTestIndexDataWithLosingOverlord() throws Exception
|
||||
{
|
||||
testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
|
||||
}
|
||||
|
||||
void doTestIndexDataWithLosingHistorical() throws Exception
|
||||
{
|
||||
testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
|
||||
}
|
||||
|
||||
protected void doTestIndexDataWithStartStopSupervisor() throws Exception
|
||||
{
|
||||
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
|
||||
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
|
||||
try (
|
||||
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
|
||||
) {
|
||||
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start generating half of the data
|
||||
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
|
||||
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
|
||||
// Verify supervisor is healthy before suspension
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Suspend the supervisor
|
||||
indexer.suspendSupervisor(generatedTestConfig.getSupervisorId());
|
||||
// Start generating remainning half of the data
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
|
||||
// Resume the supervisor
|
||||
indexer.resumeSupervisor(generatedTestConfig.getSupervisorId());
|
||||
// Verify supervisor is healthy after suspension
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Verify that supervisor can catch up with the stream
|
||||
verifyIngestedData(generatedTestConfig);
|
||||
}
|
||||
finally {
|
||||
doMethodTeardown(generatedTestConfig, streamEventWriter);
|
||||
}
|
||||
}
|
||||
|
||||
protected void doTestIndexDataWithStreamReshardSplit() throws Exception
|
||||
{
|
||||
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2
|
||||
testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT * 2);
|
||||
}
|
||||
|
||||
protected void doTestIndexDataWithStreamReshardMerge() throws Exception
|
||||
{
|
||||
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2
|
||||
testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT / 2);
|
||||
}
|
||||
|
||||
private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
|
||||
{
|
||||
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
|
||||
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
|
||||
try (
|
||||
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
|
||||
) {
|
||||
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start generating one third of the data (before restarting)
|
||||
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
|
||||
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
|
||||
// Verify supervisor is healthy before restart
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Restart Druid process
|
||||
LOG.info("Restarting Druid process");
|
||||
restartRunnable.run();
|
||||
LOG.info("Restarted Druid process");
|
||||
// Start generating one third of the data (while restarting)
|
||||
int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
|
||||
// Wait for Druid process to be available
|
||||
LOG.info("Waiting for Druid process to be available");
|
||||
waitForReadyRunnable.run();
|
||||
LOG.info("Druid process is now available");
|
||||
// Start generating remaining data (after restarting)
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
|
||||
// Verify supervisor is healthy
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Verify that supervisor ingested all data
|
||||
verifyIngestedData(generatedTestConfig);
|
||||
}
|
||||
finally {
|
||||
doMethodTeardown(generatedTestConfig, streamEventWriter);
|
||||
}
|
||||
}
|
||||
|
||||
private void testIndexWithStreamReshardHelper(int newShardCount) throws Exception
|
||||
{
|
||||
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
|
||||
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
|
||||
try (
|
||||
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
|
||||
) {
|
||||
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start generating one third of the data (before resharding)
|
||||
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
|
||||
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
|
||||
// Verify supervisor is healthy before resahrding
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Reshard the supervisor by split from STREAM_SHARD_COUNT to newShardCount and waits until the resharding starts
|
||||
streamAdminClient.updatePartitionCount(generatedTestConfig.getStreamName(), newShardCount, true);
|
||||
// Start generating one third of the data (while resharding)
|
||||
int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
|
||||
// Wait for stream to finish resharding
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> streamAdminClient.isStreamActive(generatedTestConfig.getStreamName()),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for stream to finish resharding"
|
||||
);
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> streamAdminClient.verfiyPartitionCountUpdated(generatedTestConfig.getStreamName(), STREAM_SHARD_COUNT, newShardCount),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for stream to finish resharding"
|
||||
);
|
||||
// Start generating remaining data (after resharding)
|
||||
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
|
||||
// Verify supervisor is healthy after resahrding
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Verify that supervisor can catch up with the stream
|
||||
verifyIngestedData(generatedTestConfig);
|
||||
}
|
||||
finally {
|
||||
doMethodTeardown(generatedTestConfig, streamEventWriter);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyIngestedData(GeneratedTestConfig generatedTestConfig) throws Exception
|
||||
{
|
||||
// Wait for supervisor to consume events
|
||||
LOG.info("Waiting for [%s] millis for stream indexing tasks to consume events", WAIT_TIME_MILLIS);
|
||||
Thread.sleep(WAIT_TIME_MILLIS);
|
||||
// Query data
|
||||
final String querySpec = generatedTestConfig.getStreamQueryPropsTransform().apply(getResourceAsString(QUERIES_FILE));
|
||||
// this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
|
||||
this.queryHelper.testQueriesFromString(querySpec, 2);
|
||||
LOG.info("Shutting down supervisor");
|
||||
indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
|
||||
// wait for all indexing tasks to finish
|
||||
LOG.info("Waiting for all indexing tasks to finish");
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> (indexer.getUncompletedTasksForDataSource(generatedTestConfig.getFullDatasourceName()).size() == 0),
|
||||
"Waiting for Tasks Completion"
|
||||
);
|
||||
// wait for segments to be handed off
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> coordinator.areSegmentsLoaded(generatedTestConfig.getFullDatasourceName()),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Real-time generated segments loaded"
|
||||
);
|
||||
|
||||
// this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4
|
||||
this.queryHelper.testQueriesFromString(querySpec, 2);
|
||||
}
|
||||
|
||||
long getSumOfEventSequence(int numEvents)
|
||||
{
|
||||
return (numEvents * (1 + numEvents)) / 2;
|
||||
}
|
||||
|
||||
private void doMethodTeardown(GeneratedTestConfig generatedTestConfig, StreamEventWriter streamEventWriter)
|
||||
{
|
||||
try {
|
||||
streamEventWriter.flush();
|
||||
streamEventWriter.shutdown();
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Best effort cleanup as the writer may have already been cleanup
|
||||
LOG.warn(e, "Failed to cleanup writer. This might be expected depending on the test method");
|
||||
}
|
||||
try {
|
||||
indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Best effort cleanup as the supervisor may have already been cleanup
|
||||
LOG.warn(e, "Failed to cleanup supervisor. This might be expected depending on the test method");
|
||||
}
|
||||
try {
|
||||
unloader(generatedTestConfig.getFullDatasourceName());
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Best effort cleanup as the datasource may have already been cleanup
|
||||
LOG.warn(e, "Failed to cleanup datasource. This might be expected depending on the test method");
|
||||
}
|
||||
try {
|
||||
streamAdminClient.deleteStream(generatedTestConfig.getStreamName());
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Best effort cleanup as the stream may have already been cleanup
|
||||
LOG.warn(e, "Failed to cleanup stream. This might be expected depending on the test method");
|
||||
}
|
||||
}
|
||||
|
||||
private class GeneratedTestConfig
|
||||
{
|
||||
private String streamName;
|
||||
private String fullDatasourceName;
|
||||
private String supervisorId;
|
||||
private Function<String, String> streamIngestionPropsTransform;
|
||||
private Function<String, String> streamQueryPropsTransform;
|
||||
|
||||
GeneratedTestConfig() throws Exception
|
||||
{
|
||||
streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
|
||||
String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID();
|
||||
Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
|
||||
streamAdminClient.createStream(streamName, STREAM_SHARD_COUNT, tags);
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> streamAdminClient.isStreamActive(streamName),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Wait for stream active"
|
||||
);
|
||||
fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
|
||||
streamIngestionPropsTransform = generateStreamIngestionPropsTransform(streamName, fullDatasourceName, config);
|
||||
streamQueryPropsTransform = generateStreamQueryPropsTransform(streamName, fullDatasourceName);
|
||||
}
|
||||
|
||||
public String getSupervisorId()
|
||||
{
|
||||
return supervisorId;
|
||||
}
|
||||
|
||||
public void setSupervisorId(String supervisorId)
|
||||
{
|
||||
this.supervisorId = supervisorId;
|
||||
}
|
||||
|
||||
public String getStreamName()
|
||||
{
|
||||
return streamName;
|
||||
}
|
||||
|
||||
public String getFullDatasourceName()
|
||||
{
|
||||
return fullDatasourceName;
|
||||
}
|
||||
|
||||
public Function<String, String> getStreamIngestionPropsTransform()
|
||||
{
|
||||
return streamIngestionPropsTransform;
|
||||
}
|
||||
|
||||
public Function<String, String> getStreamQueryPropsTransform()
|
||||
{
|
||||
return streamQueryPropsTransform;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@Test(groups = TestNGGroup.KAFKA_INDEX_SLOW)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest
|
||||
{
|
||||
@Override
|
||||
protected boolean isKafkaWriterTransactionalEnabled()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTestNamePrefix()
|
||||
{
|
||||
return "kafka_nontransactional_serialized";
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public void beforeClass() throws Exception
|
||||
{
|
||||
doBeforeClass();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void tearDown()
|
||||
{
|
||||
doClassTeardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test must be run individually since the test affect and modify the state of the Druid cluster
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithLosingCoordinator() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLosingCoordinator();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test must be run individually since the test affect and modify the state of the Druid cluster
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithLosingOverlord() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLosingOverlord();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test must be run individually since the test affect and modify the state of the Druid cluster
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithLosingHistorical() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLosingHistorical();
|
||||
}
|
||||
}
|
|
@ -1,63 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@Test(groups = TestNGGroup.KAFKA_INDEX)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKafkaIndexingServiceTest extends AbstractKafkaIndexerTest
|
||||
{
|
||||
private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class);
|
||||
private static final String DATASOURCE = "kafka_indexing_service_test";
|
||||
|
||||
@DataProvider
|
||||
public static Object[][] testParams()
|
||||
{
|
||||
return new Object[][]{
|
||||
{"legacy_parser"},
|
||||
{"input_format"}
|
||||
};
|
||||
}
|
||||
|
||||
@Test(dataProvider = "testParams")
|
||||
public void testKafka(String param)
|
||||
{
|
||||
final String supervisorSpecPath = "legacy_parser".equals(param)
|
||||
? INDEXER_FILE_LEGACY_PARSER
|
||||
: INDEXER_FILE_INPUT_FORMAT;
|
||||
LOG.info("Starting test: ITKafkaIndexingServiceTest");
|
||||
doKafkaIndexTest(StringUtils.format("%s_%s", DATASOURCE, param), supervisorSpecPath, false);
|
||||
}
|
||||
|
||||
@AfterMethod
|
||||
public void afterClass()
|
||||
{
|
||||
LOG.info("teardown");
|
||||
doTearDown();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest
|
||||
{
|
||||
@Override
|
||||
protected boolean isKafkaWriterTransactionalEnabled()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTestNamePrefix()
|
||||
{
|
||||
return "kafka_transactional_serialized";
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public void beforeClass() throws Exception
|
||||
{
|
||||
doBeforeClass();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void tearDown()
|
||||
{
|
||||
doClassTeardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test must be run individually since the test affect and modify the state of the Druid cluster
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithLosingCoordinator() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLosingCoordinator();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test must be run individually since the test affect and modify the state of the Druid cluster
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithLosingOverlord() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLosingOverlord();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test must be run individually since the test affect and modify the state of the Druid cluster
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithLosingHistorical() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLosingHistorical();
|
||||
}
|
||||
}
|
|
@ -1,66 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
/**
|
||||
* This is a test for the Kafka indexing service with transactional topics
|
||||
*/
|
||||
@Test(groups = TestNGGroup.KAFKA_INDEX)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKafkaIndexingServiceTransactionalTest extends AbstractKafkaIndexerTest
|
||||
{
|
||||
private static final Logger LOG = new Logger(ITKafkaIndexingServiceTransactionalTest.class);
|
||||
private static final String DATASOURCE = "kafka_indexing_service_txn_test";
|
||||
|
||||
@DataProvider
|
||||
public static Object[][] testParams()
|
||||
{
|
||||
return new Object[][]{
|
||||
{"legacy_parser"},
|
||||
{"input_format"}
|
||||
};
|
||||
}
|
||||
|
||||
@Test(dataProvider = "testParams")
|
||||
public void testKafka(String param)
|
||||
{
|
||||
final String supervisorSpecPath = "legacy_parser".equals(param)
|
||||
? INDEXER_FILE_LEGACY_PARSER
|
||||
: INDEXER_FILE_INPUT_FORMAT;
|
||||
LOG.info("Starting test: ITKafkaIndexingServiceTransactionalTest");
|
||||
doKafkaIndexTest(StringUtils.format("%s_%s", DATASOURCE, param), supervisorSpecPath, true);
|
||||
}
|
||||
|
||||
@AfterMethod
|
||||
public void afterClass()
|
||||
{
|
||||
LOG.info("teardown");
|
||||
doTearDown();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@Test(groups = TestNGGroup.KINESIS_INDEX)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKinesisIndexingServiceSerializedTest extends AbstractKinesisIndexingServiceTest
|
||||
{
|
||||
@Override
|
||||
public String getTestNamePrefix()
|
||||
{
|
||||
return "kinesis_serialized";
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public void beforeClass() throws Exception
|
||||
{
|
||||
doBeforeClass();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void tearDown()
|
||||
{
|
||||
doClassTeardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test must be run individually since the test affect and modify the state of the Druid cluster
|
||||
*/
|
||||
@Test
|
||||
public void testKinesisIndexDataWithLosingCoordinator() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLosingCoordinator();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test must be run individually since the test affect and modify the state of the Druid cluster
|
||||
*/
|
||||
@Test
|
||||
public void testKinesisIndexDataWithLosingOverlord() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLosingOverlord();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test must be run individually since the test affect and modify the state of the Druid cluster
|
||||
*/
|
||||
@Test
|
||||
public void testKinesisIndexDataWithLosingHistorical() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLosingHistorical();
|
||||
}
|
||||
}
|
|
@ -1,480 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.testing.utils.DruidClusterAdminClient;
|
||||
import org.apache.druid.testing.utils.ITRetryUtil;
|
||||
import org.apache.druid.testing.utils.KinesisAdminClient;
|
||||
import org.apache.druid.testing.utils.KinesisEventWriter;
|
||||
import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
|
||||
@Test(groups = TestNGGroup.KINESIS_INDEX)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
|
||||
{
|
||||
private static final Logger LOG = new Logger(ITKinesisIndexingServiceTest.class);
|
||||
private static final int KINESIS_SHARD_COUNT = 2;
|
||||
// Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created
|
||||
// to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method)
|
||||
// The value to this tag is a timestamp that can be used by a lambda function to remove unused stream.
|
||||
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
|
||||
private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
|
||||
private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
|
||||
private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
|
||||
private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
|
||||
private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
|
||||
// format for the querying interval
|
||||
private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
|
||||
// format for the expected timestamp in a query response
|
||||
private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
|
||||
private static final int EVENTS_PER_SECOND = 6;
|
||||
private static final long CYCLE_PADDING_MS = 100;
|
||||
private static final int TOTAL_NUMBER_OF_SECOND = 10;
|
||||
|
||||
@Inject
|
||||
private DruidClusterAdminClient druidClusterAdminClient;
|
||||
|
||||
private String streamName;
|
||||
private String fullDatasourceName;
|
||||
private KinesisAdminClient kinesisAdminClient;
|
||||
private KinesisEventWriter kinesisEventWriter;
|
||||
private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
|
||||
private Function<String, String> kinesisIngestionPropsTransform;
|
||||
private Function<String, String> kinesisQueryPropsTransform;
|
||||
private String supervisorId;
|
||||
private int secondsToGenerateRemaining;
|
||||
|
||||
@BeforeClass
|
||||
public void beforeClass() throws Exception
|
||||
{
|
||||
kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
|
||||
kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
|
||||
wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void tearDown()
|
||||
{
|
||||
wikipediaStreamEventGenerator.shutdown();
|
||||
kinesisEventWriter.shutdown();
|
||||
}
|
||||
|
||||
@BeforeMethod
|
||||
public void before()
|
||||
{
|
||||
streamName = "kinesis_index_test_" + UUID.randomUUID();
|
||||
String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
|
||||
Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
|
||||
kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> kinesisAdminClient.isStreamActive(streamName),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Wait for stream active"
|
||||
);
|
||||
secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
|
||||
fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
|
||||
kinesisIngestionPropsTransform = spec -> {
|
||||
try {
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%DATASOURCE%%",
|
||||
fullDatasourceName
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%STREAM_TYPE%%",
|
||||
"kinesis"
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TOPIC_KEY%%",
|
||||
"stream"
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TOPIC_VALUE%%",
|
||||
streamName
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%USE_EARLIEST_KEY%%",
|
||||
"useEarliestSequenceNumber"
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%STREAM_PROPERTIES_KEY%%",
|
||||
"endpoint"
|
||||
);
|
||||
return StringUtils.replace(
|
||||
spec,
|
||||
"%%STREAM_PROPERTIES_VALUE%%",
|
||||
jsonMapper.writeValueAsString(config.getStreamEndpoint())
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
kinesisQueryPropsTransform = spec -> {
|
||||
try {
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%DATASOURCE%%",
|
||||
fullDatasourceName
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_QUERY_START%%",
|
||||
INTERVAL_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_QUERY_END%%",
|
||||
INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_RESPONSE_TIMESTAMP%%",
|
||||
TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
|
||||
);
|
||||
spec = StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_ADDED%%",
|
||||
Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
|
||||
);
|
||||
return StringUtils.replace(
|
||||
spec,
|
||||
"%%TIMESERIES_NUMEVENTS%%",
|
||||
Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@AfterMethod
|
||||
public void teardown()
|
||||
{
|
||||
try {
|
||||
kinesisEventWriter.flush();
|
||||
indexer.shutdownSupervisor(supervisorId);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Best effort cleanup as the supervisor may have already went Bye-Bye
|
||||
}
|
||||
try {
|
||||
unloader(fullDatasourceName);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Best effort cleanup as the datasource may have already went Bye-Bye
|
||||
}
|
||||
try {
|
||||
kinesisAdminClient.deleteStream(streamName);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Best effort cleanup as the stream may have already went Bye-Bye
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKinesisIndexDataWithLegacyParserStableState() throws Exception
|
||||
{
|
||||
try (
|
||||
final Closeable ignored1 = unloader(fullDatasourceName)
|
||||
) {
|
||||
final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
supervisorId = indexer.submitSupervisor(taskSpec);
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start Kinesis data generator
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
|
||||
verifyIngestedData(supervisorId);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKinesisIndexDataWithInputFormatStableState() throws Exception
|
||||
{
|
||||
try (
|
||||
final Closeable ignored1 = unloader(fullDatasourceName)
|
||||
) {
|
||||
final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
supervisorId = indexer.submitSupervisor(taskSpec);
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start Kinesis data generator
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
|
||||
verifyIngestedData(supervisorId);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKinesisIndexDataWithLosingCoordinator() throws Exception
|
||||
{
|
||||
testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKinesisIndexDataWithLosingOverlord() throws Exception
|
||||
{
|
||||
testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKinesisIndexDataWithLosingHistorical() throws Exception
|
||||
{
|
||||
testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
|
||||
{
|
||||
try (
|
||||
final Closeable ignored1 = unloader(fullDatasourceName)
|
||||
) {
|
||||
final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
supervisorId = indexer.submitSupervisor(taskSpec);
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start generating half of the data
|
||||
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
|
||||
// Verify supervisor is healthy before suspension
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Suspend the supervisor
|
||||
indexer.suspendSupervisor(supervisorId);
|
||||
// Start generating remainning half of the data
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
|
||||
// Resume the supervisor
|
||||
indexer.resumeSupervisor(supervisorId);
|
||||
// Verify supervisor is healthy after suspension
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Verify that supervisor can catch up with the stream
|
||||
verifyIngestedData(supervisorId);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
|
||||
{
|
||||
// Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2
|
||||
testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
|
||||
{
|
||||
// Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2
|
||||
testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2);
|
||||
}
|
||||
|
||||
private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
|
||||
{
|
||||
try (
|
||||
final Closeable ignored1 = unloader(fullDatasourceName)
|
||||
) {
|
||||
final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
supervisorId = indexer.submitSupervisor(taskSpec);
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start generating one third of the data (before restarting)
|
||||
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
|
||||
// Verify supervisor is healthy before restart
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Restart Druid process
|
||||
LOG.info("Restarting Druid process");
|
||||
restartRunnable.run();
|
||||
LOG.info("Restarted Druid process");
|
||||
// Start generating one third of the data (while restarting)
|
||||
int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
|
||||
// Wait for Druid process to be available
|
||||
LOG.info("Waiting for Druid process to be available");
|
||||
waitForReadyRunnable.run();
|
||||
LOG.info("Druid process is now available");
|
||||
// Start generating remainding data (after restarting)
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
|
||||
// Verify supervisor is healthy
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Verify that supervisor ingested all data
|
||||
verifyIngestedData(supervisorId);
|
||||
}
|
||||
}
|
||||
|
||||
private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception
|
||||
{
|
||||
try (
|
||||
final Closeable ignored1 = unloader(fullDatasourceName)
|
||||
) {
|
||||
final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
supervisorId = indexer.submitSupervisor(taskSpec);
|
||||
LOG.info("Submitted supervisor");
|
||||
// Start generating one third of the data (before resharding)
|
||||
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
|
||||
// Verify supervisor is healthy before resahrding
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount and waits until the resharding starts
|
||||
kinesisAdminClient.updateShardCount(streamName, newShardCount, true);
|
||||
// Start generating one third of the data (while resharding)
|
||||
int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
|
||||
// Wait for kinesis stream to finish resharding
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> kinesisAdminClient.isStreamActive(streamName),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for Kinesis stream to finish resharding"
|
||||
);
|
||||
// Start generating remainding data (after resharding)
|
||||
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
|
||||
// Verify supervisor is healthy after resahrding
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Waiting for supervisor to be healthy"
|
||||
);
|
||||
// Verify that supervisor can catch up with the stream
|
||||
verifyIngestedData(supervisorId);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyIngestedData(String supervisorId) throws Exception
|
||||
{
|
||||
// Wait for supervisor to consume events
|
||||
LOG.info("Waiting for [%s] millis for Kinesis indexing tasks to consume events", WAIT_TIME_MILLIS);
|
||||
Thread.sleep(WAIT_TIME_MILLIS);
|
||||
// Query data
|
||||
final String querySpec = kinesisQueryPropsTransform.apply(getResourceAsString(QUERIES_FILE));
|
||||
// this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
|
||||
this.queryHelper.testQueriesFromString(querySpec, 2);
|
||||
LOG.info("Shutting down supervisor");
|
||||
indexer.shutdownSupervisor(supervisorId);
|
||||
// wait for all Kinesis indexing tasks to finish
|
||||
LOG.info("Waiting for all indexing tasks to finish");
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> (indexer.getPendingTasks().size()
|
||||
+ indexer.getRunningTasks().size()
|
||||
+ indexer.getWaitingTasks().size()) == 0,
|
||||
"Waiting for Tasks Completion"
|
||||
);
|
||||
// wait for segments to be handed off
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
|
||||
true,
|
||||
10000,
|
||||
30,
|
||||
"Real-time generated segments loaded"
|
||||
);
|
||||
|
||||
// this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4
|
||||
this.queryHelper.testQueriesFromString(querySpec, 2);
|
||||
}
|
||||
private long getSumOfEventSequence(int numEvents)
|
||||
{
|
||||
return (numEvents * (1 + numEvents)) / 2;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.parallelized;
|
||||
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@Test(groups = TestNGGroup.KAFKA_INDEX)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest
|
||||
{
|
||||
@Override
|
||||
protected boolean isKafkaWriterTransactionalEnabled()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTestNamePrefix()
|
||||
{
|
||||
return "kafka_non_transactional_parallelized";
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public void beforeClass() throws Exception
|
||||
{
|
||||
doBeforeClass();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void tearDown()
|
||||
{
|
||||
doClassTeardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithLegacyParserStableState() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLegacyParserStableState();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithInputFormatStableState() throws Exception
|
||||
{
|
||||
doTestIndexDataWithInputFormatStableState();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
|
||||
{
|
||||
doTestIndexDataWithStartStopSupervisor();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
|
||||
{
|
||||
doTestIndexDataWithStreamReshardSplit();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.parallelized;
|
||||
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKafkaIndexingServiceTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest
|
||||
{
|
||||
@Override
|
||||
protected boolean isKafkaWriterTransactionalEnabled()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTestNamePrefix()
|
||||
{
|
||||
return "kafka_transactional_parallelized";
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public void beforeClass() throws Exception
|
||||
{
|
||||
doBeforeClass();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void tearDown()
|
||||
{
|
||||
doClassTeardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithLegacyParserStableState() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLegacyParserStableState();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithInputFormatStableState() throws Exception
|
||||
{
|
||||
doTestIndexDataWithInputFormatStableState();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
|
||||
{
|
||||
doTestIndexDataWithStartStopSupervisor();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
|
||||
{
|
||||
doTestIndexDataWithStreamReshardSplit();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.parallelized;
|
||||
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@Test(groups = TestNGGroup.KINESIS_INDEX)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisIndexingServiceTest
|
||||
{
|
||||
@Override
|
||||
public String getTestNamePrefix()
|
||||
{
|
||||
return "kinesis_parallelized";
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public void beforeClass() throws Exception
|
||||
{
|
||||
doBeforeClass();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void tearDown()
|
||||
{
|
||||
doClassTeardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKinesisIndexDataWithLegacyParserStableState() throws Exception
|
||||
{
|
||||
doTestIndexDataWithLegacyParserStableState();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKinesisIndexDataWithInputFormatStableState() throws Exception
|
||||
{
|
||||
doTestIndexDataWithInputFormatStableState();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
|
||||
{
|
||||
doTestIndexDataWithStartStopSupervisor();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
|
||||
{
|
||||
doTestIndexDataWithStreamReshardSplit();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
|
||||
* and supervisor maintained and scoped within this test only
|
||||
*/
|
||||
@Test
|
||||
public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
|
||||
{
|
||||
doTestIndexDataWithStreamReshardMerge();
|
||||
}
|
||||
}
|
|
@ -21,14 +21,21 @@
|
|||
|
||||
|
||||
<suite name="IntegrationTestSuite">
|
||||
<listeners>
|
||||
<listener class-name="org.apache.druid.testing.utils.LoggerListener" />
|
||||
</listeners>
|
||||
<test name="AllTests">
|
||||
<packages>
|
||||
<package name="org.apache.druid.tests.*">
|
||||
<exclude name="org.apache.druid.tests.hadoop"/>
|
||||
</package>
|
||||
</packages>
|
||||
</test>
|
||||
<listeners>
|
||||
<listener class-name="org.apache.druid.testing.utils.LoggerListener" />
|
||||
<listener class-name="org.apache.druid.testing.utils.SuiteListener" />
|
||||
</listeners>
|
||||
<test name="AllSerializedTests">
|
||||
<packages>
|
||||
<package name="org.apache.druid.tests.*">
|
||||
<exclude name="org.apache.druid.tests.hadoop"/>
|
||||
<exclude name="org.apache.druid.tests.parallelized"/>
|
||||
</package>
|
||||
</packages>
|
||||
</test>
|
||||
<test name="AllParallelizedTests" parallel="methods" thread-count="2">
|
||||
<packages>
|
||||
<package name="org.apache.druid.tests.parallelized.*"/>
|
||||
</packages>
|
||||
</test>
|
||||
</suite>
|
||||
|
|
|
@ -14,6 +14,12 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Skip stopping docker if flag set (For use during development)
|
||||
if [ -n "$DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER" == true ]
|
||||
then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
for node in druid-historical druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
|
||||
|
||||
do
|
||||
|
|
3
pom.xml
3
pom.xml
|
@ -1196,7 +1196,7 @@
|
|||
<dependency>
|
||||
<groupId>org.testng</groupId>
|
||||
<artifactId>testng</artifactId>
|
||||
<version>6.8.7</version>
|
||||
<version>6.14.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
|
@ -1253,7 +1253,6 @@
|
|||
<!-- Ignore non-production code -->
|
||||
<exclude>org/apache/druid/benchmark/**/*</exclude> <!-- benchmarks -->
|
||||
<exclude>org/apache/druid/**/*Benchmark*</exclude> <!-- benchmarks -->
|
||||
<exclude>org/testng/DruidTestRunnerFactory*</exclude> <!-- benchmarks -->
|
||||
<exclude>org/apache/druid/testing/**/*</exclude> <!-- integration-tests -->
|
||||
</excludes>
|
||||
</configuration>
|
||||
|
|
Loading…
Reference in New Issue