Integration tests for stream ingestion with various data formats (#9783)

* Integration tests for stream ingestion with various data formats

* fix npe

* better logging; fix tsv

* fix tsv

* exclude kinesis from travis

* some readme
This commit is contained in:
Jihoon Son 2020-04-29 13:18:01 -07:00 committed by GitHub
parent 7510e6e722
commit 39722bd064
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1128 additions and 555 deletions

View File

@ -337,6 +337,14 @@ jobs:
script: *run_integration_test
after_failure: *integration_test_diags
- &integration_kafka_format_tests
name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags
- &integration_query
name: "(Compile=openjdk8, Run=openjdk8) query integration test"
jdk: openjdk8
@ -365,7 +373,7 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags
# END - Integration tests for Compile with Java 8 and Run with Java 8
@ -399,7 +407,7 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format' JVM_RUNTIME='-Djvm.runtime=11'
# END - Integration tests for Compile with Java 8 and Run with Java 11
- name: "security vulnerabilities"

View File

@ -319,7 +319,10 @@ Refer ITIndexerTest as an example on how to use dependency Injection
By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test
class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding
the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests"
test tag section in integration-tests/src/test/resources/testng.xml
test tag section in integration-tests/src/test/resources/testng.xml. TestNG uses two parameters, i.e.,
`thread-count` and `data-provider-thread-count`, for parallel test execution, which are set to 2 for Druid integration tests.
You may want to modify those values for faster execution.
See https://testng.org/doc/documentation-main.html#parallel-running and https://testng.org/doc/documentation-main.html#parameters-dataproviders for details.
Please be mindful when adding tests to the "AllParallelizedTests" test tag that the tests can run in parallel with
other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers,
test does not use excessive memory starving other concurent task, test does not modify and/or use other task,

View File

@ -37,6 +37,14 @@
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
@ -81,6 +89,12 @@
<version>${project.parent.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-avro-extensions</artifactId>
<version>${project.parent.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-s3-extensions</artifactId>

View File

@ -253,7 +253,7 @@ public class OverlordResourceTestClient
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while submitting supervisor to overlord, response [%s %s]",
"Error while submitting supervisor to overlord, response [%s: %s]",
response.getStatus(),
response.getContent()
);

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.druid.java.util.common.Pair;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AvroEventSerializer implements EventSerializer
{
public static final String TYPE = "avro";
private static final Schema SCHEMA = SchemaBuilder
.record("wikipedia")
.namespace("org.apache.druid")
.fields()
.requiredString("timestamp")
.requiredString("page")
.requiredString("language")
.requiredString("user")
.requiredString("unpatrolled")
.requiredString("newPage")
.requiredString("robot")
.requiredString("anonymous")
.requiredString("namespace")
.requiredString("continent")
.requiredString("country")
.requiredString("region")
.requiredString("city")
.requiredInt("added")
.requiredInt("deleted")
.requiredInt("delta")
.endRecord();
private final DatumWriter<Object> writer = new GenericDatumWriter<>(SCHEMA);
@Override
public byte[] serialize(List<Pair<String, Object>> event) throws IOException
{
final WikipediaRecord record = new WikipediaRecord();
event.forEach(pair -> record.put(pair.lhs, pair.rhs));
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(record, encoder);
encoder.flush();
out.close();
return out.toByteArray();
}
@Override
public void close()
{
}
private static class WikipediaRecord implements GenericRecord
{
private final Map<String, Object> event = new HashMap<>();
private final BiMap<Integer, String> indexes = HashBiMap.create(SCHEMA.getFields().size());
private int nextIndex = 0;
@Override
public void put(String key, Object v)
{
event.put(key, v);
indexes.inverse().computeIfAbsent(key, k -> nextIndex++);
}
@Override
public Object get(String key)
{
return event.get(key);
}
@Override
public void put(int i, Object v)
{
final String key = indexes.get(i);
if (key == null) {
throw new IndexOutOfBoundsException();
}
put(key, v);
}
@Override
public Object get(int i)
{
final String key = indexes.get(i);
if (key == null) {
throw new IndexOutOfBoundsException();
}
return get(key);
}
@Override
public Schema getSchema()
{
return SCHEMA;
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.opencsv.CSVWriter;
import org.apache.druid.java.util.common.Pair;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class CsvEventSerializer implements EventSerializer
{
public static final String TYPE = "csv";
private final ByteArrayOutputStream bos = new ByteArrayOutputStream();
private final CSVWriter writer = new CSVWriter(
new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8))
);
@Override
public byte[] serialize(List<Pair<String, Object>> event) throws IOException
{
//noinspection ConstantConditions
writer.writeNext(event.stream().map(pair -> pair.rhs.toString()).toArray(String[]::new));
writer.flush();
final byte[] serialized = bos.toByteArray();
bos.reset();
return serialized;
}
@Override
public void close() throws IOException
{
writer.close();
}
}

View File

@ -19,37 +19,25 @@
package org.apache.druid.testing.utils;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import java.util.UUID;
import java.util.List;
import java.util.stream.Collectors;
public class StreamVerifierEventGenerator extends SyntheticStreamGenerator
public class DelimitedEventSerializer implements EventSerializer
{
public StreamVerifierEventGenerator(int eventsPerSeconds, long cyclePaddingMs)
public static final String TYPE = "tsv";
@Override
public byte[] serialize(List<Pair<String, Object>> event)
{
super(eventsPerSeconds, cyclePaddingMs);
//noinspection ConstantConditions
return StringUtils.toUtf8(event.stream().map(pair -> pair.rhs.toString()).collect(Collectors.joining("\t")));
}
@Override
Object getEvent(int i, DateTime timestamp)
public void close()
{
return StreamVerifierSyntheticEvent.of(
UUID.randomUUID().toString(),
timestamp.getMillis(),
DateTimes.nowUtc().getMillis(),
i,
i == getEventsPerSecond() ? getSumOfEventSequence(getEventsPerSecond()) : null,
i == 1
);
}
/**
* Assumes the first number in the sequence is 1, incrementing by 1, until numEvents.
*/
private long getSumOfEventSequence(int numEvents)
{
return (numEvents * (1 + numEvents)) / 2;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import org.apache.druid.java.util.common.Pair;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/**
* EventSerializer is for serializing an event into a byte array.
* This interface is used to write generated events on stream processing systems such as Kafka or Kinesis
* in integration tests.
*
* @see SyntheticStreamGenerator
* @see StreamEventWriter
*/
@JsonTypeInfo(use = Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = JsonEventSerializer.TYPE, value = JsonEventSerializer.class),
@Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
@Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
@Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class)
})
public interface EventSerializer extends Closeable
{
byte[] serialize(List<Pair<String, Object>> event) throws IOException;
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.Pair;
import java.util.List;
import java.util.Map;
public class JsonEventSerializer implements EventSerializer
{
public static final String TYPE = "json";
private final ObjectMapper jsonMapper;
@JsonCreator
public JsonEventSerializer(@JacksonInject @Json ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
@Override
public byte[] serialize(List<Pair<String, Object>> event) throws JsonProcessingException
{
Map<String, Object> map = Maps.newHashMapWithExpectedSize(event.size());
event.forEach(pair -> map.put(pair.lhs, pair.rhs));
return jsonMapper.writeValueAsBytes(map);
}
@Override
public void close()
{
}
}

View File

@ -31,12 +31,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaEventWriter implements StreamEventWriter
{
private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
private final KafkaProducer<String, String> producer;
private final KafkaProducer<String, byte[]> producer;
private final boolean txnEnabled;
private final List<Future<RecordMetadata>> pendingWriteRecords = new ArrayList<>();
@ -57,7 +58,7 @@ public class KafkaEventWriter implements StreamEventWriter
this.producer = new KafkaProducer<>(
properties,
new StringSerializer(),
new StringSerializer()
new ByteArraySerializer()
);
if (txnEnabled) {
producer.initTransactions();
@ -91,25 +92,42 @@ public class KafkaEventWriter implements StreamEventWriter
}
@Override
public void write(String topic, String event)
public void write(String topic, byte[] event)
{
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, event));
pendingWriteRecords.add(future);
}
@Override
public void shutdown()
public void close()
{
flush();
producer.close();
}
@Override
public void flush() throws Exception
public void flush()
{
Exception e = null;
for (Future<RecordMetadata> future : pendingWriteRecords) {
future.get();
try {
future.get();
}
catch (InterruptedException | ExecutionException ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (e == null) {
e = ex;
} else {
e.addSuppressed(ex);
}
}
}
pendingWriteRecords.clear();
if (e != null) {
throw new RuntimeException(e);
}
}
private void addFilteredProperties(IntegrationTestingConfig config, Properties properties)

View File

@ -25,17 +25,13 @@ import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.util.AwsHostNameUtils;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
public class KinesisEventWriter implements StreamEventWriter
{
private static final Logger LOG = new Logger(KinesisEventWriter.class);
private final KinesisProducer kinesisProducer;
public KinesisEventWriter(String endpoint, boolean aggregate) throws Exception
@ -82,20 +78,19 @@ public class KinesisEventWriter implements StreamEventWriter
}
@Override
public void write(String streamName, String event)
public void write(String streamName, byte[] event)
{
kinesisProducer.addUserRecord(
streamName,
DigestUtils.sha1Hex(event),
ByteBuffer.wrap(event.getBytes(StandardCharsets.UTF_8))
ByteBuffer.wrap(event)
);
}
@Override
public void shutdown()
public void close()
{
LOG.info("Shutting down Kinesis client");
kinesisProducer.flushSync();
flush();
}
@Override

View File

@ -19,10 +19,13 @@
package org.apache.druid.testing.utils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.testng.ITestResult;
import org.testng.TestListenerAdapter;
import java.util.Arrays;
public class LoggerListener extends TestListenerAdapter
{
private static final Logger LOG = new Logger(LoggerListener.class);
@ -30,25 +33,38 @@ public class LoggerListener extends TestListenerAdapter
@Override
public void onTestFailure(ITestResult tr)
{
LOG.info("[%s] -- Test method failed", tr.getName());
LOG.error(tr.getThrowable(), "Failed %s", formatTestName(tr));
}
@Override
public void onTestSkipped(ITestResult tr)
{
LOG.info("[%s] -- Test method skipped", tr.getName());
LOG.warn("Skipped %s", formatTestName(tr));
}
@Override
public void onTestSuccess(ITestResult tr)
{
LOG.info("[%s] -- Test method passed", tr.getName());
LOG.info("Passed %s", formatTestName(tr));
}
@Override
public void onTestStart(ITestResult tr)
{
LOG.info("[%s] -- TEST START", tr.getName());
LOG.info("Starting %s", formatTestName(tr));
}
private static String formatTestName(ITestResult tr)
{
if (tr.getParameters().length == 0) {
return StringUtils.format("[%s.%s]", tr.getTestClass().getName(), tr.getName());
} else {
return StringUtils.format(
"[%s.%s] with parameters %s",
tr.getTestClass().getName(),
tr.getName(),
Arrays.toString(tr.getParameters())
);
}
}
}

View File

@ -20,21 +20,32 @@
package org.apache.druid.testing.utils;
import java.io.Closeable;
/**
* This interface is use to write test event data to the underlying stream (such as Kafka, Kinesis)
* This can also be use with {@link StreamGenerator} to write particular set of test data
*/
public interface StreamEventWriter
public interface StreamEventWriter extends Closeable
{
void write(String topic, String event);
void shutdown();
void flush() throws Exception;
boolean isTransactionEnabled();
void initTransaction();
void commitTransaction();
void write(String topic, byte[] event);
/**
* Flush pending writes on the underlying stream. This method is synchronous and waits until the flush completes.
* Note that this method is not interruptible
*/
void flush();
/**
* Close this writer. Any resource should be cleaned up when this method is called.
* Implementations must call {@link #flush()} before closing the writer.
*/
@Override
void close();
}

View File

@ -26,6 +26,4 @@ public interface StreamGenerator
void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds);
void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime);
void shutdown();
}

View File

@ -1,104 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.annotation.JsonProperty;
public class StreamVerifierSyntheticEvent
{
private String id;
private long groupingTimestamp;
private long insertionTimestamp;
private long sequenceNumber;
private Long expectedSequenceNumberSum;
private boolean firstEvent;
public StreamVerifierSyntheticEvent(
String id,
long groupingTimestamp,
long insertionTimestamp,
long sequenceNumber,
Long expectedSequenceNumberSum,
boolean firstEvent
)
{
this.id = id;
this.groupingTimestamp = groupingTimestamp;
this.insertionTimestamp = insertionTimestamp;
this.sequenceNumber = sequenceNumber;
this.expectedSequenceNumberSum = expectedSequenceNumberSum;
this.firstEvent = firstEvent;
}
@JsonProperty
public String getId()
{
return id;
}
@JsonProperty
public long getGroupingTimestamp()
{
return groupingTimestamp;
}
@JsonProperty
public long getInsertionTimestamp()
{
return insertionTimestamp;
}
@JsonProperty
public long getSequenceNumber()
{
return sequenceNumber;
}
@JsonProperty
public Long getExpectedSequenceNumberSum()
{
return expectedSequenceNumberSum;
}
@JsonProperty
public Integer getFirstEventFlag()
{
return firstEvent ? 1 : null;
}
public static StreamVerifierSyntheticEvent of(
String id,
long groupingTimestamp,
long insertionTimestamp,
long sequenceNumber,
Long expectedSequenceNumberSum,
boolean firstEvent
)
{
return new StreamVerifierSyntheticEvent(
id,
groupingTimestamp,
insertionTimestamp,
sequenceNumber,
expectedSequenceNumberSum,
firstEvent
);
}
}

View File

@ -19,32 +19,18 @@
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.joda.time.DateTime;
import java.util.List;
public abstract class SyntheticStreamGenerator implements StreamGenerator
{
private static final Logger log = new Logger(SyntheticStreamGenerator.class);
static final ObjectMapper MAPPER = new DefaultObjectMapper();
static {
MAPPER.setInjectableValues(
new InjectableValues.Std()
.addValue(ObjectMapper.class.getName(), MAPPER)
);
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
public int getEventsPerSecond()
{
return eventsPerSecond;
}
private static final Logger LOG = new Logger(SyntheticStreamGenerator.class);
private final EventSerializer serializer;
private final int eventsPerSecond;
// When calculating rates, leave this buffer to minimize overruns where we're still writing messages from the previous
@ -52,13 +38,14 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
// second to begin.
private final long cyclePaddingMs;
public SyntheticStreamGenerator(int eventsPerSecond, long cyclePaddingMs)
public SyntheticStreamGenerator(EventSerializer serializer, int eventsPerSecond, long cyclePaddingMs)
{
this.serializer = serializer;
this.eventsPerSecond = eventsPerSecond;
this.cyclePaddingMs = cyclePaddingMs;
}
abstract Object getEvent(int row, DateTime timestamp);
abstract List<Pair<String, Object>> newEvent(int row, DateTime timestamp);
@Override
public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds)
@ -83,12 +70,12 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
try {
long sleepMillis = nowCeilingToSecond.getMillis() - DateTimes.nowUtc().getMillis();
if (sleepMillis > 0) {
log.info("Waiting %s ms for next run cycle (at %s)", sleepMillis, nowCeilingToSecond);
LOG.info("Waiting %s ms for next run cycle (at %s)", sleepMillis, nowCeilingToSecond);
Thread.sleep(sleepMillis);
continue;
}
log.info(
LOG.info(
"Beginning run cycle with %s events, target completion time: %s",
eventsPerSecond,
nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs)
@ -99,11 +86,11 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
}
for (int i = 1; i <= eventsPerSecond; i++) {
streamEventWriter.write(streamTopic, MAPPER.writeValueAsString(getEvent(i, eventTimestamp)));
streamEventWriter.write(streamTopic, serializer.serialize(newEvent(i, eventTimestamp)));
long sleepTime = calculateSleepTimeMs(eventsPerSecond - i, nowCeilingToSecond);
if ((i <= 100 && i % 10 == 0) || i % 100 == 0) {
log.info("Event: %s/%s, sleep time: %s ms", i, eventsPerSecond, sleepTime);
LOG.info("Event: %s/%s, sleep time: %s ms", i, eventsPerSecond, sleepTime);
}
if (sleepTime > 0) {
@ -119,7 +106,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
eventTimestamp = eventTimestamp.plusSeconds(1);
seconds++;
log.info(
LOG.info(
"Finished writing %s events, current time: %s - updating next timestamp to: %s",
eventsPerSecond,
DateTimes.nowUtc(),
@ -128,7 +115,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
if (seconds >= totalNumberOfSeconds) {
streamEventWriter.flush();
log.info(
LOG.info(
"Finished writing %s seconds",
seconds
);
@ -141,11 +128,6 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
}
}
@Override
public void shutdown()
{
}
/**
* Dynamically adjust delay between messages to spread them out over the remaining time left in the second.
*/

View File

@ -19,42 +19,44 @@
package org.apache.druid.testing.utils;
import org.apache.druid.java.util.common.Pair;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class WikipediaStreamEventStreamGenerator extends SyntheticStreamGenerator
{
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
public WikipediaStreamEventStreamGenerator(int eventsPerSeconds, long cyclePaddingMs)
public WikipediaStreamEventStreamGenerator(EventSerializer serializer, int eventsPerSeconds, long cyclePaddingMs)
{
super(eventsPerSeconds, cyclePaddingMs);
super(serializer, eventsPerSeconds, cyclePaddingMs);
}
@Override
Object getEvent(int i, DateTime timestamp)
List<Pair<String, Object>> newEvent(int i, DateTime timestamp)
{
Map<String, Object> event = new HashMap<>();
event.put("page", "Gypsy Danger");
event.put("language", "en");
event.put("user", "nuclear");
event.put("unpatrolled", "true");
event.put("newPage", "true");
event.put("robot", "false");
event.put("anonymous", "false");
event.put("namespace", "article");
event.put("continent", "North America");
event.put("country", "United States");
event.put("region", "Bay Area");
event.put("city", "San Francisco");
event.put("timestamp", DATE_TIME_FORMATTER.print(timestamp));
event.put("added", i);
event.put("deleted", 0);
event.put("delta", i);
return event;
List<Pair<String, Object>> event = new ArrayList<>();
event.add(Pair.of("timestamp", DATE_TIME_FORMATTER.print(timestamp)));
event.add(Pair.of("page", "Gypsy Danger"));
event.add(Pair.of("language", "en"));
event.add(Pair.of("user", "nuclear"));
event.add(Pair.of("unpatrolled", "true"));
event.add(Pair.of("newPage", "true"));
event.add(Pair.of("robot", "false"));
event.add(Pair.of("anonymous", "false"));
event.add(Pair.of("namespace", "article"));
event.add(Pair.of("continent", "North America"));
event.add(Pair.of("country", "United States"));
event.add(Pair.of("region", "Bay Area"));
event.add(Pair.of("city", "San Francisco"));
event.add(Pair.of("added", i));
event.add(Pair.of("deleted", 0));
event.add(Pair.of("delta", i));
return Collections.unmodifiableList(event);
}
}

View File

@ -37,6 +37,8 @@ public class TestNGGroup
public static final String TRANSACTIONAL_KAFKA_INDEX_SLOW = "kafka-transactional-index-slow";
public static final String KAFKA_DATA_FORMAT = "kafka-data-format";
public static final String OTHER_INDEX = "other-index";
public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";
@ -110,4 +112,12 @@ public class TestNGGroup
* Kinesis stream endpoint for a region must also be pass to mvn with -Ddruid.test.config.streamEndpoint=<ENDPOINT>
*/
public static final String KINESIS_INDEX = "kinesis-index";
/**
* This group is not part of CI. To run this group, AWS kinesis configs/credentials for your AWS kinesis must be
* provided in a file. The path of the file must then be pass to mvn with -Doverride.config.path=<PATH_TO_FILE>
* See integration-tests/docker/environment-configs/override-examples/kinesis for env vars to provide.
* Kinesis stream endpoint for a region must also be pass to mvn with -Ddruid.test.config.streamEndpoint=<ENDPOINT>
*/
public static final String KINESIS_DATA_FORMAT = "kinesis-data-format";
}

View File

@ -33,17 +33,19 @@ import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.joda.time.Interval;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
public abstract class AbstractIndexerTest
{
@Inject
protected CoordinatorResourceTestClient coordinator;
@Inject
@ -109,15 +111,33 @@ public abstract class AbstractIndexerTest
);
}
protected String getResourceAsString(String file) throws IOException
public static String getResourceAsString(String file) throws IOException
{
final InputStream inputStream = ITRealtimeIndexTaskTest.class.getResourceAsStream(file);
try {
try (final InputStream inputStream = getResourceAsStream(file)) {
return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
finally {
IOUtils.closeQuietly(inputStream);
}
}
public static InputStream getResourceAsStream(String resource)
{
return ITRealtimeIndexTaskTest.class.getResourceAsStream(resource);
}
public static List<String> listResources(String dir) throws IOException
{
List<String> resources = new ArrayList<>();
try (
InputStream in = getResourceAsStream(dir);
BufferedReader br = new BufferedReader(new InputStreamReader(in, StringUtils.UTF8_STRING))
) {
String resource;
while ((resource = br.readLine()) != null) {
resources.add(resource);
}
}
return resources;
}
}

View File

@ -33,8 +33,6 @@ import java.util.function.Function;
public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamIndexingTest
{
protected abstract boolean isKafkaWriterTransactionalEnabled();
@Override
StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config)
{
@ -42,15 +40,19 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
}
@Override
public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config)
public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
{
return new KafkaEventWriter(config, isKafkaWriterTransactionalEnabled());
return new KafkaEventWriter(config, transactionEnabled);
}
@Override
Function<String, String> generateStreamIngestionPropsTransform(String streamName,
String fullDatasourceName,
IntegrationTestingConfig config)
Function<String, String> generateStreamIngestionPropsTransform(
String streamName,
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
IntegrationTestingConfig config
)
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties consumerProperties = new Properties();
@ -78,6 +80,29 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
"%%TOPIC_VALUE%%",
streamName
);
if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) {
spec = StringUtils.replace(
spec,
"%%INPUT_FORMAT%%",
parserOrInputFormat
);
spec = StringUtils.replace(
spec,
"%%PARSER%%",
"null"
);
} else if (AbstractStreamIndexingTest.INPUT_ROW_PARSER.equals(parserType)) {
spec = StringUtils.replace(
spec,
"%%PARSER%%",
parserOrInputFormat
);
spec = StringUtils.replace(
spec,
"%%INPUT_FORMAT%%",
"null"
);
}
spec = StringUtils.replace(
spec,
"%%USE_EARLIEST_KEY%%",

View File

@ -37,15 +37,20 @@ public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamI
}
@Override
StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception
StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
throws Exception
{
return new KinesisEventWriter(config.getStreamEndpoint(), false);
}
@Override
Function<String, String> generateStreamIngestionPropsTransform(String streamName,
String fullDatasourceName,
IntegrationTestingConfig config)
Function<String, String> generateStreamIngestionPropsTransform(
String streamName,
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
IntegrationTestingConfig config
)
{
return spec -> {
try {
@ -69,6 +74,29 @@ public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamI
"%%TOPIC_VALUE%%",
streamName
);
if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) {
spec = StringUtils.replace(
spec,
"%%INPUT_FORMAT%%",
parserOrInputFormat
);
spec = StringUtils.replace(
spec,
"%%PARSER%%",
"null"
);
} else if (AbstractStreamIndexingTest.INPUT_ROW_PARSER.equals(parserType)) {
spec = StringUtils.replace(
spec,
"%%PARSER%%",
parserOrInputFormat
);
spec = StringUtils.replace(
spec,
"%%INPUT_FORMAT%%",
"null"
);
}
spec = StringUtils.replace(
spec,
"%%USE_EARLIEST_KEY%%",

View File

@ -20,24 +20,34 @@
package org.apache.druid.tests.indexer;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.DruidClusterAdminClient;
import org.apache.druid.testing.utils.EventSerializer;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.JsonEventSerializer;
import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.druid.testing.utils.StreamGenerator;
import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
{
@ -48,17 +58,32 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
static final int EVENTS_PER_SECOND = 6;
static final int TOTAL_NUMBER_OF_SECOND = 10;
private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class);
// Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created
// to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method)
// The value to this tag is a timestamp that can be used by a lambda function to remove unused stream.
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
private static final int STREAM_SHARD_COUNT = 2;
private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
private static final long CYCLE_PADDING_MS = 100;
private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class);
private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
protected static final String DATA_RESOURCE_ROOT = "/stream/data";
protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
protected static final String SERIALIZER_SPEC_DIR = "serializer";
protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
protected static final String SERIALIZER = "serializer";
protected static final String INPUT_FORMAT = "inputFormat";
protected static final String INPUT_ROW_PARSER = "parser";
private static final String JSON_INPUT_FORMAT_PATH =
String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, "input_format.json");
@Inject
private DruidClusterAdminClient druidClusterAdminClient;
@ -67,92 +92,147 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
private IntegrationTestingConfig config;
private StreamAdminClient streamAdminClient;
private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
String fullDatasourceName,
IntegrationTestingConfig config);
abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
throws Exception;
abstract Function<String, String> generateStreamIngestionPropsTransform(
String streamName,
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
IntegrationTestingConfig config
);
abstract Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName);
public abstract String getTestNamePrefix();
protected void doBeforeClass() throws Exception
{
streamAdminClient = createStreamAdminClient(config);
wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
}
protected void doClassTeardown()
private static String getOnlyResourcePath(String resourceRoot) throws IOException
{
wikipediaStreamEventGenerator.shutdown();
return String.join("/", resourceRoot, Iterables.getOnlyElement(listResources(resourceRoot)));
}
protected void doTestIndexDataWithLegacyParserStableState() throws Exception
protected static List<String> listDataFormatResources() throws IOException
{
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
return listResources(DATA_RESOURCE_ROOT)
.stream()
.filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource))
.collect(Collectors.toList());
}
/**
* Returns a map of key to path to spec. The returned map contains at least 2 specs and one of them
* should be a {@link #SERIALIZER} spec.
*/
protected static Map<String, String> findTestSpecs(String resourceRoot) throws IOException
{
final List<String> specDirs = listResources(resourceRoot);
final Map<String, String> map = new HashMap<>();
for (String eachSpec : specDirs) {
if (SERIALIZER_SPEC_DIR.equals(eachSpec)) {
map.put(SERIALIZER, getOnlyResourcePath(String.join("/", resourceRoot, SERIALIZER_SPEC_DIR)));
} else if (INPUT_ROW_PARSER_SPEC_DIR.equals(eachSpec)) {
map.put(INPUT_ROW_PARSER, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_ROW_PARSER_SPEC_DIR)));
} else if (INPUT_FORMAT_SPEC_DIR.equals(eachSpec)) {
map.put(INPUT_FORMAT, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_FORMAT_SPEC_DIR)));
}
}
if (!map.containsKey(SERIALIZER_SPEC_DIR)) {
throw new IAE("Failed to find serializer spec under [%s]. Found resources are %s", resourceRoot, map);
}
if (map.size() == 1) {
throw new IAE("Failed to find input format or parser spec under [%s]. Found resources are %s", resourceRoot, map);
}
return map;
}
private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
{
return Closer.create().register(() -> doMethodTeardown(generatedTestConfig));
}
protected void doTestIndexDataStableState(
boolean transactionEnabled,
String serializerPath,
String parserType,
String specPath
) throws Exception
{
final EventSerializer serializer = jsonMapper.readValue(getResourceAsStream(serializerPath), EventSerializer.class);
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
serializer,
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(parserType, getResourceAsString(specPath));
try (
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
final Closeable closer = createResourceCloser(generatedTestConfig);
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
) {
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
LOG.info("supervisorSpec: [%s]\n", taskSpec);
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
LOG.info("Submitted supervisor");
// Start data generator
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
TOTAL_NUMBER_OF_SECOND,
FIRST_EVENT_TIME
);
verifyIngestedData(generatedTestConfig);
}
finally {
doMethodTeardown(generatedTestConfig, streamEventWriter);
}
}
protected void doTestIndexDataWithInputFormatStableState() throws Exception
void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception
{
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartCoordinatorContainer(),
() -> druidClusterAdminClient.waitUntilCoordinatorReady(),
transactionEnabled
);
}
void doTestIndexDataWithLosingOverlord(boolean transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartIndexerContainer(),
() -> druidClusterAdminClient.waitUntilIndexerReady(),
transactionEnabled
);
}
void doTestIndexDataWithLosingHistorical(boolean transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartHistoricalContainer(),
() -> druidClusterAdminClient.waitUntilHistoricalReady(),
transactionEnabled
);
}
protected void doTestIndexDataWithStartStopSupervisor(boolean transactionEnabled) throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
getResourceAsString(JSON_INPUT_FORMAT_PATH)
);
try (
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
final Closeable closer = createResourceCloser(generatedTestConfig);
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
) {
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
LOG.info("supervisorSpec: [%s]\n", taskSpec);
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
LOG.info("Submitted supervisor");
// Start data generator
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
verifyIngestedData(generatedTestConfig);
}
finally {
doMethodTeardown(generatedTestConfig, streamEventWriter);
}
}
void doTestIndexDataWithLosingCoordinator() throws Exception
{
testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
}
void doTestIndexDataWithLosingOverlord() throws Exception
{
testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
}
void doTestIndexDataWithLosingHistorical() throws Exception
{
testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
}
protected void doTestIndexDataWithStartStopSupervisor() throws Exception
{
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
try (
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
) {
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
LOG.info("supervisorSpec: [%s]\n", taskSpec);
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
@ -161,7 +241,17 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
new JsonEventSerializer(jsonMapper),
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateFirstRound,
FIRST_EVENT_TIME
);
// Verify supervisor is healthy before suspension
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@ -173,7 +263,12 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Suspend the supervisor
indexer.suspendSupervisor(generatedTestConfig.getSupervisorId());
// Start generating remainning half of the data
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateRemaining,
FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
);
// Resume the supervisor
indexer.resumeSupervisor(generatedTestConfig.getSupervisorId());
// Verify supervisor is healthy after suspension
@ -187,31 +282,36 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Verify that supervisor can catch up with the stream
verifyIngestedData(generatedTestConfig);
}
finally {
doMethodTeardown(generatedTestConfig, streamEventWriter);
}
}
protected void doTestIndexDataWithStreamReshardSplit() throws Exception
protected void doTestIndexDataWithStreamReshardSplit(boolean transactionEnabled) throws Exception
{
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2
testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT * 2);
testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT * 2);
}
protected void doTestIndexDataWithStreamReshardMerge() throws Exception
protected void doTestIndexDataWithStreamReshardMerge(boolean transactionEnabled) throws Exception
{
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2
testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT / 2);
testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT / 2);
}
private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
private void testIndexWithLosingNodeHelper(
Runnable restartRunnable,
Runnable waitForReadyRunnable,
boolean transactionEnabled
) throws Exception
{
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
getResourceAsString(JSON_INPUT_FORMAT_PATH)
);
try (
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
final Closeable closer = createResourceCloser(generatedTestConfig);
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
) {
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
LOG.info("supervisorSpec: [%s]\n", taskSpec);
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
@ -220,7 +320,17 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
new JsonEventSerializer(jsonMapper),
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateFirstRound,
FIRST_EVENT_TIME
);
// Verify supervisor is healthy before restart
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@ -236,13 +346,23 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Start generating one third of the data (while restarting)
int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateSecondRound,
FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
);
// Wait for Druid process to be available
LOG.info("Waiting for Druid process to be available");
waitForReadyRunnable.run();
LOG.info("Druid process is now available");
// Start generating remaining data (after restarting)
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateRemaining,
FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)
);
// Verify supervisor is healthy
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@ -254,19 +374,20 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Verify that supervisor ingested all data
verifyIngestedData(generatedTestConfig);
}
finally {
doMethodTeardown(generatedTestConfig, streamEventWriter);
}
}
private void testIndexWithStreamReshardHelper(int newShardCount) throws Exception
private void testIndexWithStreamReshardHelper(boolean transactionEnabled, int newShardCount) throws Exception
{
StreamEventWriter streamEventWriter = createStreamEventWriter(config);
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
getResourceAsString(JSON_INPUT_FORMAT_PATH)
);
try (
final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
final Closeable closer = createResourceCloser(generatedTestConfig);
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
) {
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
LOG.info("supervisorSpec: [%s]\n", taskSpec);
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
@ -275,7 +396,17 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
new JsonEventSerializer(jsonMapper),
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateFirstRound,
FIRST_EVENT_TIME
);
// Verify supervisor is healthy before resahrding
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@ -289,7 +420,12 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Start generating one third of the data (while resharding)
int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateSecondRound,
FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
);
// Wait for stream to finish resharding
ITRetryUtil.retryUntil(
() -> streamAdminClient.isStreamActive(generatedTestConfig.getStreamName()),
@ -299,14 +435,23 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
"Waiting for stream to finish resharding"
);
ITRetryUtil.retryUntil(
() -> streamAdminClient.verfiyPartitionCountUpdated(generatedTestConfig.getStreamName(), STREAM_SHARD_COUNT, newShardCount),
() -> streamAdminClient.verfiyPartitionCountUpdated(
generatedTestConfig.getStreamName(),
STREAM_SHARD_COUNT,
newShardCount
),
true,
10000,
30,
"Waiting for stream to finish resharding"
);
// Start generating remaining data (after resharding)
wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateRemaining,
FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)
);
// Verify supervisor is healthy after resahrding
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@ -318,9 +463,6 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Verify that supervisor can catch up with the stream
verifyIngestedData(generatedTestConfig);
}
finally {
doMethodTeardown(generatedTestConfig, streamEventWriter);
}
}
private void verifyIngestedData(GeneratedTestConfig generatedTestConfig) throws Exception
@ -329,11 +471,14 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
LOG.info("Waiting for [%s] millis for stream indexing tasks to consume events", WAIT_TIME_MILLIS);
Thread.sleep(WAIT_TIME_MILLIS);
// Query data
final String querySpec = generatedTestConfig.getStreamQueryPropsTransform().apply(getResourceAsString(QUERIES_FILE));
final String querySpec = generatedTestConfig.getStreamQueryPropsTransform()
.apply(getResourceAsString(QUERIES_FILE));
// this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
this.queryHelper.testQueriesFromString(querySpec, 2);
LOG.info("Shutting down supervisor");
indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
// Clear supervisor ID to not shutdown again.
generatedTestConfig.setSupervisorId(null);
// wait for all indexing tasks to finish
LOG.info("Waiting for all indexing tasks to finish");
ITRetryUtil.retryUntilTrue(
@ -358,22 +503,16 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
return (numEvents * (1 + numEvents)) / 2;
}
private void doMethodTeardown(GeneratedTestConfig generatedTestConfig, StreamEventWriter streamEventWriter)
private void doMethodTeardown(GeneratedTestConfig generatedTestConfig)
{
try {
streamEventWriter.flush();
streamEventWriter.shutdown();
}
catch (Exception e) {
// Best effort cleanup as the writer may have already been cleanup
LOG.warn(e, "Failed to cleanup writer. This might be expected depending on the test method");
}
try {
indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
}
catch (Exception e) {
// Best effort cleanup as the supervisor may have already been cleanup
LOG.warn(e, "Failed to cleanup supervisor. This might be expected depending on the test method");
if (generatedTestConfig.getSupervisorId() != null) {
try {
indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
}
catch (Exception e) {
// Best effort cleanup as the supervisor may have already been cleanup
LOG.warn(e, "Failed to cleanup supervisor. This might be expected depending on the test method");
}
}
try {
unloader(generatedTestConfig.getFullDatasourceName());
@ -393,17 +532,20 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
private class GeneratedTestConfig
{
private String streamName;
private String fullDatasourceName;
private final String streamName;
private final String fullDatasourceName;
private String supervisorId;
private Function<String, String> streamIngestionPropsTransform;
private Function<String, String> streamQueryPropsTransform;
GeneratedTestConfig() throws Exception
GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception
{
streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID();
Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
Map<String, String> tags = ImmutableMap.of(
STREAM_EXPIRE_TAG,
Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis())
);
streamAdminClient.createStream(streamName, STREAM_SHARD_COUNT, tags);
ITRetryUtil.retryUntil(
() -> streamAdminClient.isStreamActive(streamName),
@ -413,7 +555,13 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
"Wait for stream active"
);
fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
streamIngestionPropsTransform = generateStreamIngestionPropsTransform(streamName, fullDatasourceName, config);
streamIngestionPropsTransform = generateStreamIngestionPropsTransform(
streamName,
fullDatasourceName,
parserType,
parserOrInputFormat,
config
);
streamQueryPropsTransform = generateStreamQueryPropsTransform(streamName, fullDatasourceName);
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.tests.indexer;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@ -30,12 +29,6 @@ import org.testng.annotations.Test;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest
{
@Override
protected boolean isKafkaWriterTransactionalEnabled()
{
return false;
}
@Override
public String getTestNamePrefix()
{
@ -48,19 +41,13 @@ public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends Abstra
doBeforeClass();
}
@AfterClass
public void tearDown()
{
doClassTeardown();
}
/**
* This test must be run individually since the test affect and modify the state of the Druid cluster
*/
@Test
public void testKafkaIndexDataWithLosingCoordinator() throws Exception
{
doTestIndexDataWithLosingCoordinator();
doTestIndexDataWithLosingCoordinator(false);
}
/**
@ -69,7 +56,7 @@ public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends Abstra
@Test
public void testKafkaIndexDataWithLosingOverlord() throws Exception
{
doTestIndexDataWithLosingOverlord();
doTestIndexDataWithLosingOverlord(false);
}
/**
@ -78,6 +65,6 @@ public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends Abstra
@Test
public void testKafkaIndexDataWithLosingHistorical() throws Exception
{
doTestIndexDataWithLosingHistorical();
doTestIndexDataWithLosingHistorical(false);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.tests.indexer;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@ -30,12 +29,6 @@ import org.testng.annotations.Test;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest
{
@Override
protected boolean isKafkaWriterTransactionalEnabled()
{
return true;
}
@Override
public String getTestNamePrefix()
{
@ -48,19 +41,13 @@ public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractK
doBeforeClass();
}
@AfterClass
public void tearDown()
{
doClassTeardown();
}
/**
* This test must be run individually since the test affect and modify the state of the Druid cluster
*/
@Test
public void testKafkaIndexDataWithLosingCoordinator() throws Exception
{
doTestIndexDataWithLosingCoordinator();
doTestIndexDataWithLosingCoordinator(true);
}
/**
@ -69,7 +56,7 @@ public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractK
@Test
public void testKafkaIndexDataWithLosingOverlord() throws Exception
{
doTestIndexDataWithLosingOverlord();
doTestIndexDataWithLosingOverlord(true);
}
/**
@ -78,6 +65,6 @@ public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractK
@Test
public void testKafkaIndexDataWithLosingHistorical() throws Exception
{
doTestIndexDataWithLosingHistorical();
doTestIndexDataWithLosingHistorical(true);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.tests.indexer;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@ -42,19 +41,13 @@ public class ITKinesisIndexingServiceSerializedTest extends AbstractKinesisIndex
doBeforeClass();
}
@AfterClass
public void tearDown()
{
doClassTeardown();
}
/**
* This test must be run individually since the test affect and modify the state of the Druid cluster
*/
@Test
public void testKinesisIndexDataWithLosingCoordinator() throws Exception
{
doTestIndexDataWithLosingCoordinator();
doTestIndexDataWithLosingCoordinator(false);
}
/**
@ -63,7 +56,7 @@ public class ITKinesisIndexingServiceSerializedTest extends AbstractKinesisIndex
@Test
public void testKinesisIndexDataWithLosingOverlord() throws Exception
{
doTestIndexDataWithLosingOverlord();
doTestIndexDataWithLosingOverlord(false);
}
/**
@ -72,6 +65,6 @@ public class ITKinesisIndexingServiceSerializedTest extends AbstractKinesisIndex
@Test
public void testKinesisIndexDataWithLosingHistorical() throws Exception
{
doTestIndexDataWithLosingHistorical();
doTestIndexDataWithLosingHistorical(false);
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.parallelized;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Test(groups = TestNGGroup.KAFKA_DATA_FORMAT)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingServiceTest
{
private static final boolean TRANSACTION_DISABLED = false;
private static final boolean TRANSACTION_ENABLED = true;
/**
* Generates test parameters based on the given resources. The resources should be structured as
*
* <pre>{@code
* {RESOURCES_ROOT}/stream/data/{DATA_FORMAT}/serializer
* /input_format
* /parser
* }</pre>
*
* The {@code serializer} directory contains the spec of {@link org.apache.druid.testing.utils.EventSerializer} and
* must be present. Either {@code input_format} or {@code parser} directory should be present if {@code serializer}
* is present.
*/
@DataProvider(parallel = true)
public static Object[][] resources() throws IOException
{
final List<Object[]> resources = new ArrayList<>();
final List<String> dataFormats = listDataFormatResources();
for (String eachFormat : dataFormats) {
final Map<String, String> spec = findTestSpecs(String.join("/", DATA_RESOURCE_ROOT, eachFormat));
final String serializerPath = spec.get(AbstractStreamIndexingTest.SERIALIZER);
spec.forEach((k, path) -> {
if (!AbstractStreamIndexingTest.SERIALIZER.equals(k)) {
resources.add(new Object[]{TRANSACTION_DISABLED, serializerPath, k, path});
resources.add(new Object[]{TRANSACTION_ENABLED, serializerPath, k, path});
}
});
}
return resources.toArray(new Object[0][]);
}
@Inject
private @Json ObjectMapper jsonMapper;
@BeforeClass
public void beforeClass() throws Exception
{
doBeforeClass();
}
@Test(dataProvider = "resources")
public void testIndexData(boolean transactionEnabled, String serializerPath, String parserType, String specPath)
throws Exception
{
doTestIndexDataStableState(transactionEnabled, serializerPath, parserType, specPath);
}
@Override
public String getTestNamePrefix()
{
return "kafka_data_format";
}
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.tests.parallelized;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@ -31,12 +30,6 @@ import org.testng.annotations.Test;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest
{
@Override
protected boolean isKafkaWriterTransactionalEnabled()
{
return false;
}
@Override
public String getTestNamePrefix()
{
@ -49,32 +42,6 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
doBeforeClass();
}
@AfterClass
public void tearDown()
{
doClassTeardown();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
*/
@Test
public void testKafkaIndexDataWithLegacyParserStableState() throws Exception
{
doTestIndexDataWithLegacyParserStableState();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
*/
@Test
public void testKafkaIndexDataWithInputFormatStableState() throws Exception
{
doTestIndexDataWithInputFormatStableState();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
@ -82,7 +49,7 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
@Test
public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
{
doTestIndexDataWithStartStopSupervisor();
doTestIndexDataWithStartStopSupervisor(false);
}
/**
@ -92,6 +59,6 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
@Test
public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
{
doTestIndexDataWithStreamReshardSplit();
doTestIndexDataWithStreamReshardSplit(false);
}
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.tests.parallelized;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@ -31,12 +30,6 @@ import org.testng.annotations.Test;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaIndexingServiceTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest
{
@Override
protected boolean isKafkaWriterTransactionalEnabled()
{
return true;
}
@Override
public String getTestNamePrefix()
{
@ -49,32 +42,6 @@ public class ITKafkaIndexingServiceTransactionalParallelizedTest extends Abstrac
doBeforeClass();
}
@AfterClass
public void tearDown()
{
doClassTeardown();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
*/
@Test
public void testKafkaIndexDataWithLegacyParserStableState() throws Exception
{
doTestIndexDataWithLegacyParserStableState();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
*/
@Test
public void testKafkaIndexDataWithInputFormatStableState() throws Exception
{
doTestIndexDataWithInputFormatStableState();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
@ -82,7 +49,7 @@ public class ITKafkaIndexingServiceTransactionalParallelizedTest extends Abstrac
@Test
public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
{
doTestIndexDataWithStartStopSupervisor();
doTestIndexDataWithStartStopSupervisor(true);
}
/**
@ -92,6 +59,6 @@ public class ITKafkaIndexingServiceTransactionalParallelizedTest extends Abstrac
@Test
public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
{
doTestIndexDataWithStreamReshardSplit();
doTestIndexDataWithStreamReshardSplit(true);
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.parallelized;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Test(groups = TestNGGroup.KINESIS_DATA_FORMAT)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKinesisIndexingServiceDataFormatTest extends AbstractKinesisIndexingServiceTest
{
/**
* Generates test parameters based on the given resources. The resources should be structured as
*
* <pre>{@code
* {RESOURCES_ROOT}/stream/data/{DATA_FORMAT}/serializer
* /input_format
* /parser
* }</pre>
*
* The {@code serializer} directory contains the spec of {@link org.apache.druid.testing.utils.EventSerializer} and
* must be present. Either {@code input_format} or {@code parser} directory should be present if {@code serializer}
* is present.
*/
@DataProvider(parallel = true)
public static Object[][] resources() throws IOException
{
final List<Object[]> resources = new ArrayList<>();
final List<String> dataFormats = listDataFormatResources();
for (String eachFormat : dataFormats) {
final Map<String, String> spec = findTestSpecs(String.join("/", DATA_RESOURCE_ROOT, eachFormat));
final String serializerPath = spec.get(AbstractStreamIndexingTest.SERIALIZER);
spec.forEach((k, path) -> {
if (!AbstractStreamIndexingTest.SERIALIZER.equals(k)) {
resources.add(new Object[]{serializerPath, k, path});
}
});
}
return resources.toArray(new Object[0][]);
}
@Inject
private @Json
ObjectMapper jsonMapper;
@BeforeClass
public void beforeClass() throws Exception
{
doBeforeClass();
}
@Test(dataProvider = "resources")
public void testIndexData(String serializerPath, String parserType, String specPath)
throws Exception
{
doTestIndexDataStableState(false, serializerPath, parserType, specPath);
}
@Override
public String getTestNamePrefix()
{
return "kinesis_data_format";
}
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.tests.parallelized;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@ -43,32 +42,6 @@ public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisInd
doBeforeClass();
}
@AfterClass
public void tearDown()
{
doClassTeardown();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
*/
@Test
public void testKinesisIndexDataWithLegacyParserStableState() throws Exception
{
doTestIndexDataWithLegacyParserStableState();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
*/
@Test
public void testKinesisIndexDataWithInputFormatStableState() throws Exception
{
doTestIndexDataWithInputFormatStableState();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
@ -76,7 +49,7 @@ public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisInd
@Test
public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
{
doTestIndexDataWithStartStopSupervisor();
doTestIndexDataWithStartStopSupervisor(false);
}
/**
@ -86,7 +59,7 @@ public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisInd
@Test
public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
{
doTestIndexDataWithStreamReshardSplit();
doTestIndexDataWithStreamReshardSplit(false);
}
/**
@ -96,6 +69,6 @@ public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisInd
@Test
public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
{
doTestIndexDataWithStreamReshardMerge();
doTestIndexDataWithStreamReshardMerge(false);
}
}

View File

@ -1,61 +0,0 @@
{
"type": "%%STREAM_TYPE%%",
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "MINUTE",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "%%STREAM_TYPE%%",
"intermediatePersistPeriod": "PT30S",
"maxRowsPerSegment": 5000000,
"maxRowsInMemory": 500000
},
"ioConfig": {
"%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
"taskCount": 2,
"replicas": 1,
"taskDuration": "PT5M",
"%%USE_EARLIEST_KEY%%": true
}
}

View File

@ -0,0 +1,39 @@
{
"type": "avro_stream",
"avroBytesDecoder" : {
"type": "schema_inline",
"schema": {
"namespace": "org.apache.druid",
"name": "wikipedia",
"type": "record",
"fields": [
{ "name": "timestamp", "type": "string" },
{ "name": "page", "type": "string" },
{ "name": "language", "type": "string" },
{ "name": "user", "type": "string" },
{ "name": "unpatrolled", "type": "string" },
{ "name": "newPage", "type": "string" },
{ "name": "robot", "type": "string" },
{ "name": "anonymous", "type": "string" },
{ "name": "namespace", "type": "string" },
{ "name": "continent", "type": "string" },
{ "name": "country", "type": "string" },
{ "name": "region", "type": "string" },
{ "name": "city", "type": "string" },
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
}
},
"parseSpec": {
"format": "avro",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
}
}
}

View File

@ -0,0 +1,3 @@
{
"type": "avro"
}

View File

@ -0,0 +1,4 @@
{
"type" : "csv",
"columns": ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"]
}

View File

@ -0,0 +1,16 @@
{
"type": "string",
"parseSpec": {
"format": "csv",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
}

View File

@ -0,0 +1,3 @@
{
"type": "csv"
}

View File

@ -0,0 +1,3 @@
{
"type" : "json"
}

View File

@ -0,0 +1,15 @@
{
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
}

View File

@ -0,0 +1,3 @@
{
"type": "json"
}

View File

@ -2,6 +2,7 @@
"type": "%%STREAM_TYPE%%",
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"parser": %%PARSER%%,
"timestampSpec": {
"column": "timestamp",
"format": "auto"
@ -51,8 +52,6 @@
"replicas": 1,
"taskDuration": "PT5M",
"%%USE_EARLIEST_KEY%%": true,
"inputFormat" : {
"type" : "json"
}
"inputFormat" : %%INPUT_FORMAT%%
}
}

View File

@ -0,0 +1,4 @@
{
"type" : "tsv",
"columns": ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"]
}

View File

@ -0,0 +1,16 @@
{
"type": "string",
"parseSpec": {
"format": "tsv",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
}

View File

@ -0,0 +1,3 @@
{
"type": "tsv"
}

View File

@ -20,7 +20,7 @@
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
<suite name="IntegrationTestSuite">
<suite name="IntegrationTestSuite" data-provider-thread-count="2">
<listeners>
<listener class-name="org.apache.druid.testing.utils.LoggerListener" />
<listener class-name="org.apache.druid.testing.utils.SuiteListener" />