Adding ParserSpec for Influx Line Protocol (#5440)

* Adding ParserSpec for Influx Line Protocol

* Addressing PR feedback

- Remove extraneous TODO
- Better handling of parse errors (e.g. invalid timestamp)
- Handle sub-millisecond timestamps

* Adding documentation for Influx parser

* Fixing docs
This commit is contained in:
Nathan Hartwell 2018-03-26 16:28:46 -05:00 committed by Jonathan Wei
parent ec17a44e09
commit ea30c05355
9 changed files with 749 additions and 0 deletions

View File

@ -0,0 +1,46 @@
---
layout: doc_page
---
# InfluxDB Line Protocol Parser
To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-influx-extensions`.
This extension enables Druid to parse the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_tutorial/), a popular text-based timeseries metric serialization format.
## Line Protocol
A typical line looks like this:
```cpu,application=dbhost=prdb123,region=us-east-1 usage_idle=99.24,usage_user=0.55 1520722030000000000```
which contains four parts:
- measurement: A string indicating the name of the measurement represented (e.g. cpu, network, web_requests)
- tags: zero or more key-value pairs (i.e. dimensions)
- measurements: one or more key-value pairs; values can be numeric, boolean, or string
- timestamp: nanoseconds since Unix epoch (the parser truncates it to milliseconds)
The parser extracts these fields into a map, giving the measurement the key `measurement` and the timestamp the key `_ts`. The tag and measurement keys are copied verbatim, so users should take care to avoid name collisions. It is up to the ingestion spec to decide which fields should be treated as dimensions and which should be treated as metrics (typically tags correspond to dimensions and measurements correspond to metrics).
The parser is configured like so:
```json
"parser": {
"type": "string",
"parseSpec": {
"format": "influx",
"timestampSpec": {
"column": "__ts",
"format": "millis"
},
"dimensionsSpec": {
"dimensionExclusions": [
"__ts"
]
},
"whitelistMeasurements": [
"cpu"
]
}
```
The `whitelistMeasurements` field is an optional list of strings. If present, measurements that do not match one of the strings in the list will be ignored.

View File

@ -0,0 +1,112 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-influx-extensions</artifactId>
<name>druid-influx-extensions</name>
<description>druid-influx-extensions</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.13.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<properties>
</properties>
<repositories>
</repositories>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.2.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>
<version>1.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>antlr4</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>strict</id>
<build>
<plugins>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,77 @@
/** Based on v1.4 from their docs
at https://docs.influxdata.com/influxdb/v1.4/write_protocols/line_protocol_tutorial/
**/
grammar InfluxLineProtocol;
lines
: line ('\n' line)* '\n'? EOF
;
line
: identifier (',' tag_set)? ' ' field_set (' ' timestamp)?
;
timestamp
: NUMBER
;
field_set
: field_pair (',' field_pair)*
;
tag_set
: tag_pair (',' tag_pair)*
;
tag_pair
: identifier '=' identifier
;
field_pair
: identifier '=' field_value
;
identifier
: IDENTIFIER_STRING | NUMBER | BOOLEAN
;
field_value
: QUOTED_STRING | NUMBER | BOOLEAN
;
eol
: NEWLINE | EOF
;
NEWLINE
: '\n'
;
NUMBER
: '-'? INT ('.' [0-9] +) ? 'i'?
;
BOOLEAN
: 'TRUE' | 'true' | 'True' | 't' | 'T' | 'FALSE' | 'False' | 'false' | 'F' | 'f'
;
QUOTED_STRING
: '"' (StringFieldEscapeSequence | ~(["\\]) )* '"'
;
IDENTIFIER_STRING
: (IdentifierEscapeSequence | ~([,= \n\\]) )+
;
fragment IdentifierEscapeSequence
: '\\' [,= \\]
;
fragment StringFieldEscapeSequence
: '\\' ["\\]
;
fragment INT
: '0' | [1-9] [0-9]*
;

View File

@ -0,0 +1,51 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.influx;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.Collections;
import java.util.List;
public class InfluxExtensionsModule implements DruidModule
{
public InfluxExtensionsModule()
{
}
@Override
public List<? extends Module> getJacksonModules()
{
return Collections.singletonList(
new SimpleModule("InfluxInputRowParserModule")
.registerSubtypes(
new NamedType(InfluxParseSpec.class, "influx")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.influx;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.parsers.Parser;
import java.util.List;
public class InfluxParseSpec extends ParseSpec
{
private List<String> measurementWhitelist;
@JsonCreator
public InfluxParseSpec(
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("whitelistMeasurements") List<String> measurementWhitelist
)
{
super(
new TimestampSpec(InfluxParser.TIMESTAMP_KEY, "millis", null),
dimensionsSpec != null ? dimensionsSpec : new DimensionsSpec(null, null, null)
);
this.measurementWhitelist = measurementWhitelist;
}
@Override
public Parser<String, Object> makeParser()
{
if (measurementWhitelist != null && measurementWhitelist.size() > 0) {
return new InfluxParser(Sets.newHashSet(measurementWhitelist));
} else {
return new InfluxParser(null);
}
}
@Override
public ParseSpec withDimensionsSpec(DimensionsSpec spec)
{
return new InfluxParseSpec(spec, measurementWhitelist);
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.influx;
import com.google.common.collect.ImmutableList;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.common.parsers.Parser;
import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.TokenStream;
import javax.annotation.Nullable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class InfluxParser implements Parser<String, Object>
{
public static final String TIMESTAMP_KEY = "__ts";
private static final String MEASUREMENT_KEY = "measurement";
private final Set<String> measurementWhitelist;
public InfluxParser(Set<String> measurementWhitelist)
{
this.measurementWhitelist = measurementWhitelist;
}
@Override
public void startFileFromBeginning()
{
}
@Nullable
@Override
public Map<String, Object> parseToMap(String input)
{
CharStream charStream = new ANTLRInputStream(input);
InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
TokenStream tokenStream = new CommonTokenStream(lexer);
InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
if (parser.getNumberOfSyntaxErrors() != 0) {
throw new ParseException("Unable to parse line.");
}
if (lines.size() != 1) {
throw new ParseException("Multiple lines present; unable to parse more than one per record.");
}
Map<String, Object> out = new LinkedHashMap<>();
InfluxLineProtocolParser.LineContext line = lines.get(0);
String measurement = parseIdentifier(line.identifier());
if (!checkWhitelist(measurement)) {
throw new ParseException("Metric not whitelisted.");
}
out.put(MEASUREMENT_KEY, measurement);
if (line.tag_set() != null) {
line.tag_set().tag_pair().forEach(t -> parseTag(t, out));
}
line.field_set().field_pair().forEach(t -> parseField(t, out));
if (line.timestamp() != null) {
String timestamp = line.timestamp().getText();
parseTimestamp(timestamp, out);
}
return out;
}
private void parseTag(InfluxLineProtocolParser.Tag_pairContext tag, Map<String, Object> out)
{
String key = parseIdentifier(tag.identifier(0));
String value = parseIdentifier(tag.identifier(1));
out.put(key, value);
}
private void parseField(InfluxLineProtocolParser.Field_pairContext field, Map<String, Object> out)
{
String key = parseIdentifier(field.identifier());
InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
Object value;
if (valueContext.NUMBER() != null) {
value = parseNumber(valueContext.NUMBER().getText());
} else if (valueContext.BOOLEAN() != null) {
value = parseBool(valueContext.BOOLEAN().getText());
} else {
value = parseQuotedString(valueContext.QUOTED_STRING().getText());
}
out.put(key, value);
}
private Object parseQuotedString(String text)
{
return text.substring(1, text.length() - 1).replaceAll("\\\\\"", "\"");
}
private Object parseNumber(String raw)
{
if (raw.endsWith("i")) {
return new Long(raw.substring(0, raw.length() - 1));
}
return new Double(raw);
}
private Object parseBool(String raw)
{
char first = raw.charAt(0);
if (first == 't' || first == 'T') {
return "true";
} else {
return "false";
}
}
private String parseIdentifier(InfluxLineProtocolParser.IdentifierContext ctx)
{
if (ctx.BOOLEAN() != null || ctx.NUMBER() != null) {
return ctx.getText();
}
return ctx.IDENTIFIER_STRING().getText().replaceAll("\\\\([,= ])", "$1");
}
private boolean checkWhitelist(String m)
{
return (measurementWhitelist == null) || measurementWhitelist.contains(m);
}
private void parseTimestamp(String timestamp, Map<String, Object> dest)
{
// Influx timestamps come in nanoseconds; treat anything less than 1 ms as 0
if (timestamp.length() < 7) {
dest.put(TIMESTAMP_KEY, 0L);
} else {
timestamp = timestamp.substring(0, timestamp.length() - 6);
long timestampMillis = new Long(timestamp);
dest.put(TIMESTAMP_KEY, timestampMillis);
}
}
@Override
public List<String> getFieldNames()
{
return ImmutableList.of();
}
@Override
public void setFieldNames(Iterable<String> fieldNames)
{
}
}

View File

@ -0,0 +1 @@
io.druid.data.input.influx.InfluxExtensionsModule

View File

@ -0,0 +1,225 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.influx;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.common.parsers.Parser;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isA;
@RunWith(JUnitParamsRunner.class)
public class InfluxParserTest
{
private String name;
private String input;
private Map<String, Object> expected;
private static Object[] testCase(String name, String input, Parsed expected)
{
return Lists.newArrayList(name, input, expected).toArray();
}
public Object[] testData()
{
return Lists.newArrayList(
testCase(
"real sample",
"cpu,host=foo.bar.baz,region=us-east-1,application=echo pct_idle=99.3,pct_user=88.8,m1_load=2i 1465839830100400200",
Parsed.row("cpu", 1465839830100L)
.with("host", "foo.bar.baz")
.with("region", "us-east-1")
.with("application", "echo")
.with("pct_idle", 99.3)
.with("pct_user", 88.8)
.with("m1_load", 2L)
),
testCase(
"negative timestamp",
"foo,region=us-east-1,host=127.0.0.1 m=1.0,n=3.0,o=500i -123456789",
Parsed.row("foo", -123L)
.with("region", "us-east-1")
.with("host", "127.0.0.1")
.with("m", 1.0)
.with("n", 3.0)
.with("o", 500L)
),
testCase(
"truncated timestamp",
"foo,region=us-east-1,host=127.0.0.1 m=1.0,n=3.0,o=500i 123",
Parsed.row("foo", 0L)
.with("region", "us-east-1")
.with("host", "127.0.0.1")
.with("m", 1.0)
.with("n", 3.0)
.with("o", 500L)
),
testCase(
"special characters",
"!@#$%^&*()_-\\=+,+++\\ +++=--\\ --- __**__=\"ü\" 123456789",
Parsed.row("!@#$%^&*()_-=+", 123L)
.with("+++ +++", "-- ---")
.with("__**__", "127.0.0.1")
.with("__**__", "ü")
),
testCase(
"unicode characters",
"\uD83D\uDE00,\uD83D\uDE05=\uD83D\uDE06 \uD83D\uDE0B=100i,b=\"\uD83D\uDE42\" 123456789",
Parsed.row("\uD83D\uDE00", 123L)
.with("\uD83D\uDE05", "\uD83D\uDE06")
.with("\uD83D\uDE0B", 100L)
.with("b", "\uD83D\uDE42")
),
testCase(
"quoted string measurement value",
"foo,region=us-east-1,host=127.0.0.1 m=1.0,n=3.0,o=\"something \\\"cool\\\" \" 123456789",
Parsed.row("foo", 123L)
.with("region", "us-east-1")
.with("host", "127.0.0.1")
.with("m", 1.0)
.with("n", 3.0)
.with("o", "something \"cool\" ")
),
testCase(
"no tags",
"foo m=1.0,n=3.0 123456789",
Parsed.row("foo", 123L)
.with("m", 1.0)
.with("n", 3.0)
),
testCase(
"Escaped characters in identifiers",
"f\\,oo\\ \\=,bar=baz m=1.0,n=3.0 123456789",
Parsed.row("f,oo =", 123L)
.with("bar", "baz")
.with("m", 1.0)
.with("n", 3.0)
),
testCase(
"Escaped characters in identifiers",
"foo\\ \\=,bar=baz m=1.0,n=3.0 123456789",
Parsed.row("foo =", 123L)
.with("bar", "baz")
.with("m", 1.0)
.with("n", 3.0)
)
).toArray();
}
@Test
@Parameters(method = "testData")
public void testParse(String name, String input, Parsed expected)
{
Parser<String, Object> parser = new InfluxParser(null);
Map<String, Object> parsed = parser.parseToMap(input);
assertThat("correct measurement name", parsed.get("measurement"), equalTo(expected.measurement));
assertThat("correct timestamp", parsed.get(InfluxParser.TIMESTAMP_KEY), equalTo(expected.timestamp));
expected.kv.forEach((k, v) -> {
assertThat("correct field " + k, parsed.get(k), equalTo(v));
});
parsed.remove("measurement");
parsed.remove(InfluxParser.TIMESTAMP_KEY);
assertThat("No extra keys in parsed data", parsed.keySet(), equalTo(expected.kv.keySet()));
}
@Test
public void testParseWhitelistPass()
{
Parser<String, Object> parser = new InfluxParser(Sets.newHashSet("cpu"));
String input = "cpu,host=foo.bar.baz,region=us-east,application=echo pct_idle=99.3,pct_user=88.8,m1_load=2 1465839830100400200";
Map<String, Object> parsed = parser.parseToMap(input);
assertThat(parsed.get("measurement"), equalTo("cpu"));
}
@Test
public void testParseWhitelistFail()
{
Parser<String, Object> parser = new InfluxParser(Sets.newHashSet("mem"));
String input = "cpu,host=foo.bar.baz,region=us-east,application=echo pct_idle=99.3,pct_user=88.8,m1_load=2 1465839830100400200";
try {
parser.parseToMap(input);
}
catch (ParseException t) {
assertThat(t, isA(ParseException.class));
return;
}
Assert.fail("Exception not thrown");
}
public Object[] failureTestData()
{
return Lists.newArrayList(
Pair.of("Empty line", ""),
Pair.of("Invalid measurement", "invalid measurement"),
Pair.of("Invalid timestamp", "foo i=123 123x")
).toArray();
}
@Test
@Parameters(method = "failureTestData")
public void testParseFailures(Pair<String, String> testCase)
{
Parser<String, Object> parser = new InfluxParser(null);
try {
Map res = parser.parseToMap(testCase.rhs);
}
catch (ParseException t) {
assertThat(t, isA(ParseException.class));
return;
}
Assert.fail(testCase.rhs + ": exception not thrown");
}
private static class Parsed
{
private String measurement;
private Long timestamp;
private Map<String, Object> kv = new HashMap<>();
public static Parsed row(String measurement, Long timestamp)
{
Parsed e = new Parsed();
e.measurement = measurement;
e.timestamp = timestamp;
return e;
}
public Parsed with(String k, Object v)
{
kv.put(k, v);
return this;
}
}
}

View File

@ -125,6 +125,7 @@
<module>extensions-core/simple-client-sslcontext</module>
<module>extensions-core/druid-basic-security</module>
<!-- Community extensions -->
<module>extensions-contrib/influx-extensions</module>
<module>extensions-contrib/azure-extensions</module>
<module>extensions-contrib/cassandra-storage</module>
<module>extensions-contrib/druid-rocketmq</module>