Add integration test for protobuf (#11126)

* add file test

* test

* for test

* bug fixed

* test

* test

* test

* bug fixed

* delete auto scaler

* add input format

* add extensions

* bug fixed

* bug fixed

* bug fixed

* revert

* add schema registry test

* bug fixed

* bug fixed

* delete desc

* delete change

* add desc

* bug fixed

* test inputformat

* bug fixed

* bug fixed

* bug fixed

* bug fixed

* delete io exception

* change builder not static

* change pom

* bug fixed

Co-authored-by: yuanyi <yuanyi@freewheel.tv>
This commit is contained in:
Yi Yuan 2021-05-18 06:45:07 +08:00 committed by GitHub
parent 6d08a7051e
commit 3be8e29269
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 288 additions and 4 deletions

View File

@ -26,6 +26,7 @@ RUN APACHE_ARCHIVE_MIRROR_HOST=${APACHE_ARCHIVE_MIRROR_HOST} /root/base-setup.sh
FROM druidbase
ARG MYSQL_VERSION
ARG CONFLUENT_VERSION
# Verify Java version
RUN java -version
@ -46,6 +47,9 @@ ADD lib/* /usr/local/druid/lib/
RUN wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \
-O /usr/local/druid/lib/mysql-connector-java.jar
RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \
-O /usr/local/druid/lib/kafka-protobuf-provider.jar
# Add sample data
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \

Binary file not shown.

View File

@ -120,6 +120,12 @@
<version>${project.parent.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-protobuf-extensions</artifactId>
<version>${project.parent.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-s3-extensions</artifactId>
@ -366,6 +372,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<version>5.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.0</version>
</dependency>
<!-- Tests -->
<dependency>
@ -387,6 +404,12 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
<version>0.9.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
@ -470,6 +493,7 @@
<DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>
<DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER>
<MYSQL_VERSION>${mysql.version}</MYSQL_VERSION>
<CONFLUENT_VERSION>5.5.1</CONFLUENT_VERSION>
<KAFKA_VERSION>${apache.kafka.version}</KAFKA_VERSION>
<ZK_VERSION>${zk.version}</ZK_VERSION>
</environmentVariables>

View File

@ -80,6 +80,7 @@ fi
mkdir -p $SHARED_DIR/wikiticker-it
cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
cp docker/wiki-simple-lookup.json $SHARED_DIR/wikiticker-it/wiki-simple-lookup.json
cp docker/test-data/wikipedia.desc $SHARED_DIR/wikiticker-it/wikipedia.desc
# copy other files if needed
if [ -n "$DRUID_INTEGRATION_TEST_RESOURCE_FILE_DIR_PATH" ]

View File

@ -22,17 +22,17 @@ set -e
if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ]
then
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version"
docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
else
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}"
case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in
8)
echo "Build druid-cluster with Java 8"
docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;;
11)
echo "Build druid-cluster with Java 11"
docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;;
*)
echo "Invalid JVM Runtime given. Stopping"

View File

@ -43,7 +43,9 @@ import java.util.List;
@Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
@Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
@Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class),
@Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class)
@Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class),
@Type(name = ProtobufEventSerializer.TYPE, value = ProtobufEventSerializer.class),
@Type(name = ProtobufSchemaRegistryEventSerializer.TYPE, value = ProtobufSchemaRegistryEventSerializer.class)
})
public interface EventSerializer extends Closeable
{

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.github.os72.protobuf.dynamic.MessageDefinition;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import java.util.List;
public class ProtobufEventSerializer implements EventSerializer
{
public static final String TYPE = "protobuf";
private static final Logger LOGGER = new Logger(ProtobufEventSerializer.class);
public static final DynamicSchema SCHEMA;
static {
DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
MessageDefinition wikiDef = MessageDefinition.newBuilder("Wikipedia")
.addField("optional", "string", "timestamp", 1)
.addField("optional", "string", "page", 2)
.addField("optional", "string", "language", 3)
.addField("optional", "string", "user", 4)
.addField("optional", "string", "unpatrolled", 5)
.addField("optional", "string", "newPage", 6)
.addField("optional", "string", "robot", 7)
.addField("optional", "string", "anonymous", 8)
.addField("optional", "string", "namespace", 9)
.addField("optional", "string", "continent", 10)
.addField("optional", "string", "country", 11)
.addField("optional", "string", "region", 12)
.addField("optional", "string", "city", 13)
.addField("optional", "int32", "added", 14)
.addField("optional", "int32", "deleted", 15)
.addField("optional", "int32", "delta", 16)
.build();
schemaBuilder.addMessageDefinition(wikiDef);
DynamicSchema schema = null;
try {
schema = schemaBuilder.build();
}
catch (Descriptors.DescriptorValidationException e) {
LOGGER.error("Could not init protobuf schema.");
}
SCHEMA = schema;
}
@Override
public byte[] serialize(List<Pair<String, Object>> event)
{
DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
for (Pair<String, Object> pair : event) {
builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
}
return builder.build().toByteArray();
}
@Override
public void close()
{
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.IntegrationTestingConfig;
import java.nio.ByteBuffer;
import java.util.List;
public class ProtobufSchemaRegistryEventSerializer extends ProtobufEventSerializer
{
private static final int MAX_INITIALIZE_RETRIES = 10;
public static final String TYPE = "protobuf-schema-registry";
private final IntegrationTestingConfig config;
private final CachedSchemaRegistryClient client;
private int schemaId = -1;
@JsonCreator
public ProtobufSchemaRegistryEventSerializer(
@JacksonInject IntegrationTestingConfig config
)
{
this.config = config;
this.client = new CachedSchemaRegistryClient(
StringUtils.format("http://%s", config.getSchemaRegistryHost()),
Integer.MAX_VALUE,
ImmutableMap.of(
"basic.auth.credentials.source", "USER_INFO",
"basic.auth.user.info", "druid:diurd"
),
ImmutableMap.of()
);
}
@Override
public void initialize(String topic)
{
try {
RetryUtils.retry(
() -> {
schemaId = client.register(topic, new ProtobufSchema(ProtobufEventSerializer.SCHEMA.newMessageBuilder("Wikipedia").getDescriptorForType()));
return 0;
},
(e) -> true,
MAX_INITIALIZE_RETRIES
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public byte[] serialize(List<Pair<String, Object>> event)
{
DynamicMessage.Builder builder = SCHEMA.newMessageBuilder("Wikipedia");
Descriptors.Descriptor msgDesc = builder.getDescriptorForType();
for (Pair<String, Object> pair : event) {
builder.setField(msgDesc.findFieldByName(pair.lhs), pair.rhs);
}
byte[] bytes = builder.build().toByteArray();
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(schemaId).put((byte) 0).put(bytes);
bb.rewind();
return bb.array();
}
}

View File

@ -0,0 +1,12 @@
{
"type": "protobuf",
"protoBytesDecoder": {
"type": "file",
"descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
"protoMessageType": "Wikipedia"
},
"flattenSpec": {
"useFieldDiscovery": true
},
"binaryAsString": false
}

View File

@ -0,0 +1,18 @@
{
"type": "protobuf",
"protoBytesDecoder": {
"type": "file",
"descriptor": "file:///shared/wikiticker-it/wikipedia.desc",
"protoMessageType": "Wikipedia"
},
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
}
}
}

View File

@ -0,0 +1,3 @@
{
"type": "protobuf"
}

View File

@ -0,0 +1,15 @@
{
"type": "protobuf",
"protoBytesDecoder": {
"type": "schema_registry",
"url": "%%SCHEMA_REGISTRY_HOST%%",
"config": {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "druid:diurd"
}
},
"flattenSpec": {
"useFieldDiscovery": true
},
"binaryAsString": false
}

View File

@ -0,0 +1,21 @@
{
"type": "protobuf",
"protoBytesDecoder" : {
"type": "schema_registry",
"url": "%%SCHEMA_REGISTRY_HOST%%",
"config": {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "druid:diurd"
}
},
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
}
}
}

View File

@ -0,0 +1,3 @@
{
"type": "protobuf-schema-registry"
}