[Feature] Thrift support for realtime and batch ingestion (#3418)

* Thrift ingestion plugin

1. thrift binary is platform dependent, use scrooge to generate java files to avoid style check failure
2. stream and hadoop ingesion are both supported, input format can be sequence file and lzo thrift block file.
3. base64 and protocol aware

change header

* fix conlicts in pom
This commit is contained in:
Ninglin Du 2016-12-14 02:05:15 +08:00 committed by Fangjin Yang
parent 48b22e261a
commit 469ab21091
11 changed files with 732 additions and 0 deletions

View File

@ -0,0 +1,104 @@
---
layout: doc_page
---
# Thrift
To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-thrift-extensions`.
This extension enables Druid to ingest thrift compact data online (`ByteBuffer`) and offline (SequenceFile of type `<Writable, BytesWritable>` or LzoThriftBlock File).
You may want to use another version of thrift, change the dependency in pom and compile yourself.
## Thrift Parser
| Field | Type | Description | Required |
| ----------- | ----------- | ---------------------------------------- | -------- |
| type | String | This should say `thrift` | yes |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a Json parseSpec. | yes |
| thriftJar | String | path of thrift jar, if not provided, it will try to find the thrift class in classpath. Thrift jar in batch ingestion should be uploaded to HDFS first and configure `jobProperties` with `"tmpjars":"/path/to/your/thrift.jar"` | no |
| thriftClass | String | classname of thrift | yes |
- Realtime Ingestion (tranquility example)
```json
{
"dataSources": [{
"spec": {
"dataSchema": {
"dataSource": "book",
"granularitySpec": { },
"parser": {
"type": "thrift",
"thriftClass": "io.druid.data.input.thrift.Book",
"protocol": "compact",
"parseSpec": {
"format": "json",
...
}
},
"metricsSpec": [...]
},
"tuningConfig": {...}
},
"properties": {...}
}],
"properties": {...}
}
```
To use it with tranquility,
```bash
bin/tranquility kafka \
-configFile $jsonConfig \
-Ddruid.extensions.directory=/path/to/extensions \
-Ddruid.extensions.loadList='["druid-thrift-extensions"]'
```
Hadoop-client is also needed, you may copy all the hadoop-client dependency jars into directory `druid-thrift-extensions` to make is simple.
- Batch Ingestion - `inputFormat` and `tmpjars` should be set.
This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig could be one of `"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"` and `com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat`. Be carefull, when `LzoThriftBlockInputFormat` is used, thrift class must be provided twice.
```json
{
"type": "index_hadoop",
"spec": {
"dataSchema": {
"dataSource": "book",
"parser": {
"type": "thrift",
"jarPath": "book.jar",
"thriftClass": "io.druid.data.input.thrift.Book",
"protocol": "compact",
"parseSpec": {
"format": "json",
...
}
},
"metricsSpec": [],
"granularitySpec": {}
},
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
// "inputFormat": "com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat",
"paths": "/user/to/some/book.seq"
}
},
"tuningConfig": {
"type": "hadoop",
"jobProperties": {
"tmpjars":"/user/h_user_profile/du00/druid/test/book.jar",
// "elephantbird.class.for.MultiInputFormat" : "${YOUR_THRIFT_CLASS_NAME}"
}
}
}
}
```

View File

@ -63,6 +63,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)|
|druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)|
## Promoting Community Extension to Core Extension

View File

@ -0,0 +1,65 @@
{
"type" : "index_hadoop",
"spec" : {
"dataSchema" : {
"dataSource" : "test",
"parser" : {
"type" : "thrift",
"jarPath" : "example/book.jar",
"thriftClass": "io.druid.data.input.thrift.Book",
"protocol" : "compact",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "date",
"format" : "auto"
},
"flattenSpec" : {
"useFieldDiscovery" : true,
"fields" : [ {
"type" : "path",
"name" : "lastName",
"expr" : "$.author.lastName"
}, "title" ]
},
"dimensionsSpec" : {
"dimensions" : [ "title", "lastName" ]
}
}
},
"metricsSpec" : [ {
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "cost",
"fieldName" : "price"
} ],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "DAY",
"intervals" : [ "2015-09-01/2015-10-01" ]
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"inputFormat" : "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"paths" : "example/book.seq"
}
},
"tuningConfig" : {
"type" : "hadoop",
"partitionsSpec" : {
"type" : "hashed",
"targetPartitionSize" : 5000000
},
"jobProperties" : {
"tmpjars": "/user/xxx/druid/test/book.jar"
}
}
},
"dataSource" : "test"
}

View File

@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>druid-thrift-extensions</artifactId>
<name>druid-thrift-extensions</name>
<description>druid-thrift-extensions</description>
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.9.3-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<properties>
<thrift.version>0.9.3</thrift.version>
<elephantbird.version>4.8</elephantbird.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird-core</artifactId>
<version>${elephantbird.version}</version>
</dependency>
<dependency>
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird-hadoop-compat</artifactId>
<version>${elephantbird.version}</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scrooge-core_2.11</artifactId>
<version>4.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.twitter</groupId>
<artifactId>scrooge-maven-plugin</artifactId>
<version>4.11.0</version>
<configuration>
<language>java</language>
</configuration>
<executions>
<execution>
<id>thrift-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,121 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.thrift;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.binary.Base64;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.protocol.TProtocolUtil;
import org.apache.thrift.protocol.TSimpleJSONProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThriftDeserialization
{
private static final Logger log = LoggerFactory.getLogger(ThriftDeserialization.class);
private static final ThreadLocal<TDeserializer> DESERIALIZER_COMPACT = new ThreadLocal<TDeserializer>()
{
@Override
protected TDeserializer initialValue()
{
return new TDeserializer(new TCompactProtocol.Factory());
}
};
private static final ThreadLocal<TDeserializer> DESERIALIZER_BINARY = new ThreadLocal<TDeserializer>()
{
@Override
protected TDeserializer initialValue()
{
return new TDeserializer(new TBinaryProtocol.Factory());
}
};
private static final ThreadLocal<TDeserializer> DESERIALIZER_JSON = new ThreadLocal<TDeserializer>()
{
@Override
protected TDeserializer initialValue()
{
return new TDeserializer(new TJSONProtocol.Factory());
}
};
public static final ThreadLocal<TSerializer> SERIALIZER_SIMPLE_JSON = new ThreadLocal<TSerializer>()
{
@Override
protected TSerializer initialValue()
{
return new TSerializer(new TSimpleJSONProtocol.Factory());
}
};
private static final byte[] EMPTY_BYTES = new byte[0];
public static byte[] decodeB64IfNeeded(final byte[] src)
{
Preconditions.checkNotNull(src, "src bytes cannot be null");
if (src.length <= 0) {
return EMPTY_BYTES;
}
final byte last = src[src.length - 1];
return (0 == last || '}' == last) ? src : Base64.decodeBase64(src);
}
/**
* Deserializes byte-array into thrift object.
* <p>
* Supporting binary, compact and json protocols,
* and the byte array could be or not be encoded by Base64.
*
* @param bytes the byte-array to deserialize
* @param thriftObj the output thrift object
*
* @return the output thrift object, or null if error occurs
*/
public static <T extends TBase> T detectAndDeserialize(final byte[] bytes, final T thriftObj) throws TException
{
Preconditions.checkNotNull(thriftObj);
try {
final byte[] src = decodeB64IfNeeded(bytes);
final TProtocolFactory protocolFactory = TProtocolUtil.guessProtocolFactory(src, null);
Preconditions.checkNotNull(protocolFactory);
if (protocolFactory instanceof TCompactProtocol.Factory) {
DESERIALIZER_COMPACT.get().deserialize(thriftObj, src);
} else if (protocolFactory instanceof TBinaryProtocol.Factory) {
DESERIALIZER_BINARY.get().deserialize(thriftObj, src);
} else {
DESERIALIZER_JSON.get().deserialize(thriftObj, src);
}
}
catch (final IllegalArgumentException e) {
throw new TException(e);
}
return thriftObj;
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.thrift;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
public class ThriftExtensionsModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.asList(
new SimpleModule("ThriftInputRowParserModule")
.registerSubtypes(
new NamedType(ThriftInputRowParser.class, "thrift")
)
);
}
@Override
public void configure(Binder binder)
{ }
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.thrift;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.metamx.common.IAE;
import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import org.apache.hadoop.io.BytesWritable;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.Map;
import io.druid.java.util.common.parsers.Parser;
/**
* 1. load thrift class from classpath or provided jar
* 2. deserialize content bytes and serialize to json
* 3. use JsonSpec to do things left
*/
public class ThriftInputRowParser implements InputRowParser<Object>
{
private final ParseSpec parseSpec;
private final String jarPath;
private final String thriftClassName;
final private Parser<String, Object> parser;
volatile private Class<TBase> thriftClass = null;
@JsonCreator
public ThriftInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("thriftJar") String jarPath,
@JsonProperty("thriftClass") String thriftClassName
)
{
this.jarPath = jarPath;
this.thriftClassName = thriftClassName;
Preconditions.checkNotNull(thriftClassName, "thrift class name");
this.parseSpec = parseSpec;
parser = parseSpec.makeParser();
}
public Class<TBase> getThriftClass()
throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException
{
final Class<TBase> thrift;
if (jarPath != null) {
File jar = new File(jarPath);
URLClassLoader child = new URLClassLoader(
new URL[]{jar.toURI().toURL()},
this.getClass().getClassLoader()
);
thrift = (Class<TBase>) Class.forName(thriftClassName, true, child);
} else {
thrift = (Class<TBase>) Class.forName(thriftClassName);
}
thrift.newInstance();
return thrift;
}
@Override
public InputRow parse(Object input)
{
// There is a Parser check in phase 2 of mapreduce job, thrift jar may not present in peon side.
// Place it this initialization in constructor will get ClassNotFoundException
try {
if (thriftClass == null) {
thriftClass = getThriftClass();
}
}
catch (IOException e) {
throw new IAE(e, "failed to load jar [%s]", jarPath);
}
catch (ClassNotFoundException e) {
throw new IAE(e, "class [%s] not found in jar", thriftClassName);
}
catch (InstantiationException | IllegalAccessException e) {
throw new IAE(e, "instantiation thrift instance failed");
}
final String json;
try {
if (input instanceof ByteBuffer) { // realtime stream
final byte[] bytes = ((ByteBuffer) input).array();
TBase o = thriftClass.newInstance();
ThriftDeserialization.detectAndDeserialize(bytes, o);
json = ThriftDeserialization.SERIALIZER_SIMPLE_JSON.get().toString(o);
} else if (input instanceof BytesWritable) { // sequence file
final byte[] bytes = ((BytesWritable) input).getBytes();
TBase o = thriftClass.newInstance();
ThriftDeserialization.detectAndDeserialize(bytes, o);
json = ThriftDeserialization.SERIALIZER_SIMPLE_JSON.get().toString(o);
} else if (input instanceof ThriftWritable) { // LzoBlockThrift file
TBase o = (TBase) ((ThriftWritable) input).get();
json = ThriftDeserialization.SERIALIZER_SIMPLE_JSON.get().toString(o);
} else {
throw new IAE("unsupport input class of [%s]", input.getClass());
}
}
catch (IllegalAccessException | InstantiationException | TException e) {
throw new IAE("some thing wrong with your thrift?");
}
Map<String, Object> record = parser.parse(json);
return new MapBasedInputRow(
parseSpec.getTimestampSpec().extractTimestamp(record),
parseSpec.getDimensionsSpec().getDimensionNames(),
record
);
}
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new ThriftInputRowParser(parseSpec, jarPath, thriftClassName);
}
}

View File

@ -0,0 +1 @@
io.druid.data.input.thrift.ThriftExtensionsModule

View File

@ -0,0 +1,124 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.thrift;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.JSONPathFieldSpec;
import io.druid.data.input.impl.JSONPathFieldType;
import io.druid.data.input.impl.JSONPathSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.TimestampSpec;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.io.BytesWritable;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ThriftInputRowParserTest
{
private ParseSpec parseSpec;
@Before
public void setUp() throws Exception
{
parseSpec = new JSONParseSpec(new TimestampSpec("date", "auto", null),
new DimensionsSpec(Lists.<DimensionSchema>newArrayList(
new StringDimensionSchema("title"),
new StringDimensionSchema("lastName")
), null, null),
new JSONPathSpec(
true,
Lists.newArrayList(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "title", "title"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "lastName", "$.author.lastName")
)
), null
);
}
@Test
public void testGetThriftClass() throws Exception
{
ThriftInputRowParser parser1 = new ThriftInputRowParser(
parseSpec,
"example/book.jar",
"io.druid.data.input.thrift.Book"
);
assertEquals(parser1.getThriftClass().getName(), "io.druid.data.input.thrift.Book");
ThriftInputRowParser parser2 = new ThriftInputRowParser(parseSpec, null, "io.druid.data.input.thrift.Book");
assertEquals(parser2.getThriftClass().getName(), "io.druid.data.input.thrift.Book");
}
@Test
public void testParse() throws Exception
{
ThriftInputRowParser parser = new ThriftInputRowParser(
parseSpec,
"example/book.jar",
"io.druid.data.input.thrift.Book"
);
Book book = new Book().setDate("2016-08-29").setPrice(19.9).setTitle("title")
.setAuthor(new Author().setFirstName("first").setLastName("last"));
TSerializer serializer;
byte[] bytes;
// 1. compact
serializer = new TSerializer(new TCompactProtocol.Factory());
bytes = serializer.serialize(book);
serializationAndTest(parser, bytes);
// 2. binary + base64
serializer = new TSerializer(new TBinaryProtocol.Factory());
serializationAndTest(parser, Base64.encodeBase64(serializer.serialize(book)));
// 3. json
serializer = new TSerializer(new TJSONProtocol.Factory());
bytes = serializer.serialize(book);
serializationAndTest(parser, bytes);
}
public void serializationAndTest(ThriftInputRowParser parser, byte[] bytes) throws TException
{
ByteBuffer buffer = ByteBuffer.wrap(bytes);
InputRow row1 = parser.parse(buffer);
assertTrue(row1.getDimension("title").get(0).equals("title"));
InputRow row2 = parser.parse(new BytesWritable(bytes));
assertTrue(row2.getDimension("lastName").get(0).equals("last"));
}
}

View File

@ -0,0 +1,13 @@
namespace java io.druid.data.input.thrift
struct Author {
1: string firstName;
2: string lastName;
}
struct Book {
1: string date;
2: double price;
3: string title;
4: Author author;
}

View File

@ -115,6 +115,7 @@
<module>extensions-contrib/time-min-max</module>
<module>extensions-contrib/google-extensions</module>
<module>extensions-contrib/virtual-columns</module>
<module>extensions-contrib/thrift-extensions</module>
</modules>
<dependencyManagement>