diff --git a/api/src/main/java/io/druid/data/input/MapBasedRow.java b/api/src/main/java/io/druid/data/input/MapBasedRow.java index 8e5af6541d0..5a7c4db0373 100644 --- a/api/src/main/java/io/druid/data/input/MapBasedRow.java +++ b/api/src/main/java/io/druid/data/input/MapBasedRow.java @@ -21,15 +21,10 @@ package io.druid.data.input; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Lists; -import com.google.common.primitives.Longs; import io.druid.guice.annotations.PublicApi; import io.druid.java.util.common.DateTimes; -import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.parsers.ParseException; import org.joda.time.DateTime; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -38,8 +33,6 @@ import java.util.Map; @PublicApi public class MapBasedRow implements Row { - private static final Long LONG_ZERO = 0L; - private final DateTime timestamp; private final Map event; @@ -83,16 +76,7 @@ public class MapBasedRow implements Row @Override public List getDimension(String dimension) { - final Object dimValue = event.get(dimension); - - if (dimValue == null) { - return Collections.emptyList(); - } else if (dimValue instanceof List) { - // guava's toString function fails on null objects, so please do not use it - return Lists.transform((List) dimValue, String::valueOf); - } else { - return Collections.singletonList(String.valueOf(dimValue)); - } + return Rows.objectToStrings(event.get(dimension)); } @Override @@ -104,44 +88,7 @@ public class MapBasedRow implements Row @Override public Number getMetric(String metric) { - Object metricValue = event.get(metric); - - if (metricValue == null) { - return LONG_ZERO; - } - - if (metricValue instanceof Number) { - return (Number) metricValue; - } else if (metricValue instanceof String) { - try { - String metricValueString = StringUtils.removeChar(((String) metricValue).trim(), ','); - // Longs.tryParse() doesn't support leading '+', so we need to trim it ourselves - metricValueString = trimLeadingPlusOfLongString(metricValueString); - Long v = Longs.tryParse(metricValueString); - // Do NOT use ternary operator here, because it makes Java to convert Long to Double - if (v != null) { - return v; - } else { - return Double.valueOf(metricValueString); - } - } - catch (Exception e) { - throw new ParseException(e, "Unable to parse metrics[%s], value[%s]", metric, metricValue); - } - } else { - throw new ParseException("Unknown type[%s]", metricValue.getClass()); - } - } - - private static String trimLeadingPlusOfLongString(String metricValueString) - { - if (metricValueString.length() > 1 && metricValueString.charAt(0) == '+') { - char secondChar = metricValueString.charAt(1); - if (secondChar >= '0' && secondChar <= '9') { - metricValueString = metricValueString.substring(1); - } - } - return metricValueString; + return Rows.objectToNumber(metric, event.get(metric)); } @Override diff --git a/api/src/main/java/io/druid/data/input/Rows.java b/api/src/main/java/io/druid/data/input/Rows.java index a31d1b3a224..0ef09e9fa24 100644 --- a/api/src/main/java/io/druid/data/input/Rows.java +++ b/api/src/main/java/io/druid/data/input/Rows.java @@ -22,7 +22,12 @@ package io.druid.data.input; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Maps; +import com.google.common.primitives.Longs; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.parsers.ParseException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -31,9 +36,12 @@ import java.util.Set; */ public class Rows { + public static final Long LONG_ZERO = 0L; + /** * @param timeStamp rollup up timestamp to be used to create group key - * @param inputRow input row + * @param inputRow input row + * * @return groupKey for the given input row */ public static List toGroupKey(long timeStamp, InputRow inputRow) @@ -50,4 +58,77 @@ public class Rows dims ); } + + /** + * Convert an object to a list of strings. + */ + public static List objectToStrings(final Object inputValue) + { + if (inputValue == null) { + return Collections.emptyList(); + } else if (inputValue instanceof List) { + // guava's toString function fails on null objects, so please do not use it + final List values = (List) inputValue; + + final List retVal = new ArrayList<>(values.size()); + for (Object val : values) { + retVal.add(String.valueOf(val)); + } + + return retVal; + } else { + return Collections.singletonList(String.valueOf(inputValue)); + } + } + + /** + * Convert an object to a number. Nulls are treated as zeroes. + * + * @param name field name of the object being converted (may be used for exception messages) + * @param inputValue the actual object being converted + * + * @return a number + * + * @throws NullPointerException if the string is null + * @throws ParseException if the column cannot be converted to a number + */ + public static Number objectToNumber(final String name, final Object inputValue) + { + if (inputValue == null) { + return Rows.LONG_ZERO; + } + + if (inputValue instanceof Number) { + return (Number) inputValue; + } else if (inputValue instanceof String) { + try { + String metricValueString = StringUtils.removeChar(((String) inputValue).trim(), ','); + // Longs.tryParse() doesn't support leading '+', so we need to trim it ourselves + metricValueString = trimLeadingPlusOfLongString(metricValueString); + Long v = Longs.tryParse(metricValueString); + // Do NOT use ternary operator here, because it makes Java to convert Long to Double + if (v != null) { + return v; + } else { + return Double.valueOf(metricValueString); + } + } + catch (Exception e) { + throw new ParseException(e, "Unable to parse value[%s] for field[%s]", inputValue, name); + } + } else { + throw new ParseException("Unknown type[%s] for field", inputValue.getClass(), inputValue); + } + } + + private static String trimLeadingPlusOfLongString(String metricValueString) + { + if (metricValueString.length() > 1 && metricValueString.charAt(0) == '+') { + char secondChar = metricValueString.charAt(1); + if (secondChar >= '0' && secondChar <= '9') { + metricValueString = metricValueString.substring(1); + } + } + return metricValueString; + } } diff --git a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java index 3d48a95abe7..12e0c3029f0 100644 --- a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java @@ -121,7 +121,8 @@ public class DimensionsSpec return dimensionExclusions; } - @Deprecated @JsonIgnore + @Deprecated + @JsonIgnore public List getSpatialDimensions() { Iterable filteredList = Iterables.filter( @@ -244,4 +245,13 @@ public class DimensionsSpec result = 31 * result + dimensionExclusions.hashCode(); return result; } + + @Override + public String toString() + { + return "DimensionsSpec{" + + "dimensions=" + dimensions + + ", dimensionExclusions=" + dimensionExclusions + + '}'; + } } diff --git a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java index d1d0606ea49..eb60b366877 100644 --- a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java +++ b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java @@ -24,6 +24,7 @@ import io.druid.data.input.InputRow; import io.druid.utils.Runnables; import org.apache.commons.io.LineIterator; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Iterator; @@ -69,6 +70,7 @@ public class FileIteratingFirehose implements Firehose return lineIterator != null && lineIterator.hasNext(); } + @Nullable @Override public InputRow nextRow() { diff --git a/api/src/main/java/io/druid/data/input/impl/InputRowParser.java b/api/src/main/java/io/druid/data/input/impl/InputRowParser.java index daec114ec7e..9b6ca0bb979 100644 --- a/api/src/main/java/io/druid/data/input/impl/InputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/InputRowParser.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.data.input.InputRow; import io.druid.guice.annotations.ExtensionPoint; +import javax.annotation.Nullable; + @ExtensionPoint @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class) @JsonSubTypes(value = { @@ -33,6 +35,11 @@ import io.druid.guice.annotations.ExtensionPoint; }) public interface InputRowParser { + /** + * Parse an input into an {@link InputRow}. Return null if this input should be thrown away, or throws + * {@code ParseException} if the input is unparseable. + */ + @Nullable InputRow parse(T input); ParseSpec getParseSpec(); diff --git a/api/src/main/java/io/druid/data/input/impl/JSONParseSpec.java b/api/src/main/java/io/druid/data/input/impl/JSONParseSpec.java index 357cbb16a61..ce33d41ed48 100644 --- a/api/src/main/java/io/druid/data/input/impl/JSONParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/JSONParseSpec.java @@ -30,6 +30,7 @@ import io.druid.java.util.common.parsers.Parser; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** */ @@ -97,4 +98,38 @@ public class JSONParseSpec extends ParseSpec { return featureSpec; } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + final JSONParseSpec that = (JSONParseSpec) o; + return Objects.equals(flattenSpec, that.flattenSpec) && + Objects.equals(featureSpec, that.featureSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), flattenSpec, featureSpec); + } + + @Override + public String toString() + { + return "JSONParseSpec{" + + "timestampSpec=" + getTimestampSpec() + + ", dimensionsSpec=" + getDimensionsSpec() + + ", flattenSpec=" + flattenSpec + + ", featureSpec=" + featureSpec + + '}'; + } } diff --git a/docs/content/misc/math-expr.md b/docs/content/misc/math-expr.md index 67adfa13baf..f4d0db7f9e3 100644 --- a/docs/content/misc/math-expr.md +++ b/docs/content/misc/math-expr.md @@ -13,13 +13,16 @@ This expression language supports the following operators (listed in decreasing |<, <=, >, >=, ==, !=|Binary Comparison| |&&,\|\||Binary Logical AND, OR| -Long, double and string data types are supported. If a number contains a dot, it is interpreted as a double, otherwise it is interpreted as a long. That means, always add a '.' to your number if you want it interpreted as a double value. String literal should be quoted by single quotation marks. +Long, double, and string data types are supported. If a number contains a dot, it is interpreted as a double, otherwise it is interpreted as a long. That means, always add a '.' to your number if you want it interpreted as a double value. String literals should be quoted by single quotation marks. -Expressions can contain variables. Variable names may contain letters, digits, '\_' and '$'. Variable names must not begin with a digit. To escape other special characters, user can quote it with double quotation marks. +Multi-value types are not fully supported yet. Expressions may behave inconsistently on multi-value types, and you +should not rely on the behavior in this case to stay the same in future releases. -For logical operators, a number is true if and only if it is positive (0 or minus value means false). For string type, it's evaluation result of 'Boolean.valueOf(string)'. +Expressions can contain variables. Variable names may contain letters, digits, '\_' and '$'. Variable names must not begin with a digit. To escape other special characters, you can quote it with double quotation marks. -Also, the following built-in functions are supported. +For logical operators, a number is true if and only if it is positive (0 or negative value means false). For string type, it's the evaluation result of 'Boolean.valueOf(string)'. + +The following built-in functions are available. ## General functions diff --git a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java index 8f11561df42..db1d95b36b7 100644 --- a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java @@ -250,6 +250,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory +public class RocketMQFirehoseFactory implements FirehoseFactory> { private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class); @@ -139,7 +140,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory byteBufferInputRowParser, File temporaryDirectory ) throws IOException, ParseException { @@ -149,7 +150,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory theParser = byteBufferInputRowParser.withParseSpec( byteBufferInputRowParser.getParseSpec() .withDimensionsSpec( byteBufferInputRowParser.getParseSpec() @@ -247,6 +248,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory parser = (InputRowParser) config.getParser(); reader.initialize(split, context); diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java index 4d2ae18e466..d68f2e93dc5 100644 --- a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java @@ -206,6 +206,7 @@ public class OrcIndexGeneratorJobTest new UniformGranularitySpec( Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval) ), + null, mapper ), new HadoopIOConfig( diff --git a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index 66c91770a2b..bbd54aa44f9 100644 --- a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -30,10 +30,10 @@ import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; -import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; import io.druid.java.util.common.logger.Logger; import net.jodah.lyra.ConnectionOptions; import net.jodah.lyra.Connections; @@ -41,6 +41,7 @@ import net.jodah.lyra.config.Config; import net.jodah.lyra.retry.RetryPolicy; import net.jodah.lyra.util.Duration; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -100,7 +101,7 @@ import java.util.concurrent.LinkedBlockingQueue; * For more information on RabbitMQ high availability please see: * http://www.rabbitmq.com/ha.html. */ -public class RabbitMQFirehoseFactory implements FirehoseFactory +public class RabbitMQFirehoseFactory implements FirehoseFactory> { private static final Logger log = new Logger(RabbitMQFirehoseFactory.class); @@ -135,7 +136,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory firehoseParser, File temporaryDirectory) throws IOException { ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory); Config lyraConfig = new Config() @@ -225,6 +226,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory +public class KafkaEightFirehoseFactory implements FirehoseFactory> { private static final Logger log = new Logger(KafkaEightFirehoseFactory.class); @@ -69,13 +69,14 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory firehoseParser, File temporaryDirectory) throws IOException { Set newDimExclus = Sets.union( firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), Sets.newHashSet("feed") ); - final ByteBufferInputRowParser theParser = firehoseParser.withParseSpec( + + final InputRowParser theParser = firehoseParser.withParseSpec( firehoseParser.getParseSpec() .withDimensionsSpec( firehoseParser.getParseSpec() @@ -111,6 +112,7 @@ public class KafkaEightFirehoseFactory implements FirehoseFactoryof("dim1", "dim2")), + DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim1t", "dim2")), null, null ), @@ -199,6 +200,7 @@ public class KafkaIndexTaskTest ), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + null, objectMapper ); @@ -235,11 +237,7 @@ public class KafkaIndexTaskTest emitter = new ServiceEmitter( "service", "host", - new LoggingEmitter( - log, - LoggingEmitter.Level.ERROR, - new DefaultObjectMapper() - ) + new NoopEmitter() ); emitter.start(); EmittingLogger.registerEmitter(emitter); @@ -325,7 +323,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -349,8 +346,8 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } @Test(timeout = 60_000L) @@ -369,7 +366,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -405,8 +401,8 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } @Test(timeout = 60_000L) @@ -425,7 +421,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -461,8 +456,8 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } @Test(timeout = 60_000L) @@ -481,7 +476,6 @@ public class KafkaIndexTaskTest DateTimes.of("2010"), false ), - null, null ); @@ -518,9 +512,71 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("a"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("b"), readSegmentDim1(desc2)); - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc3)); + Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3)); + } + + @Test(timeout = 60_000L) + public void testRunWithTransformSpec() throws Exception + { + final KafkaIndexTask task = createTask( + null, + DATA_SCHEMA.withTransformSpec( + new TransformSpec( + new SelectorDimFilter("dim1", "b", null), + ImmutableList.of( + new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil()) + ) + ) + ), + new KafkaIOConfig( + "sequence0", + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true, + false, + null, + null, + false + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Wait for the task to start reading + while (task.getStatus() != KafkaIndexTask.Status.READING) { + Thread.sleep(10); + } + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + } + + // Wait for task to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(1, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(4, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", desc1)); } @Test(timeout = 60_000L) @@ -546,7 +602,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -589,7 +644,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -613,8 +667,8 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } @Test(timeout = 60_000L) @@ -643,7 +697,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -667,8 +720,8 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } @Test(timeout = 60_000L) @@ -696,7 +749,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -731,7 +783,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); final KafkaIndexTask task2 = createTask( @@ -747,7 +798,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -783,8 +833,8 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } @Test(timeout = 60_000L) @@ -803,7 +853,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); final KafkaIndexTask task2 = createTask( @@ -819,7 +868,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -843,8 +891,8 @@ public class KafkaIndexTaskTest Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); - Assert.assertEquals(2, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway()); // Check published segments & metadata, should all be from the first task SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); @@ -856,8 +904,8 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } @Test(timeout = 60_000L) @@ -876,7 +924,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); final KafkaIndexTask task2 = createTask( @@ -892,7 +939,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -922,8 +968,8 @@ public class KafkaIndexTaskTest Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); - Assert.assertEquals(2, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway()); // Check published segments & metadata SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1); @@ -932,10 +978,10 @@ public class KafkaIndexTaskTest Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc3)); - Assert.assertEquals(ImmutableList.of("f"), readSegmentDim1(desc4)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3)); + Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc4)); } @Test(timeout = 60_000L) @@ -954,7 +1000,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -988,13 +1033,13 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc4)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc4)); // Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically Assert.assertEquals( ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")), - ImmutableSet.of(readSegmentDim1(desc2), readSegmentDim1(desc3)) + ImmutableSet.of(readSegmentColumn("dim1", desc2), readSegmentColumn("dim1", desc3)) ); } @@ -1014,7 +1059,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); final KafkaIndexTask task2 = createTask( @@ -1030,7 +1074,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -1067,9 +1110,9 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); - Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc3)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); + Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc3)); } @Test(timeout = 60_000L) @@ -1088,7 +1131,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -1125,7 +1167,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -1159,8 +1200,8 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } @Test(timeout = 60_000L) @@ -1179,7 +1220,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -1245,8 +1285,8 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } @Test(timeout = 60_000L) @@ -1265,7 +1305,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -1333,9 +1372,9 @@ public class KafkaIndexTaskTest ); // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("b"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc2)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc3)); + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc2)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3)); } @Test(timeout = 30_000L) @@ -1354,7 +1393,6 @@ public class KafkaIndexTaskTest null, false ), - null, null ); @@ -1394,7 +1432,6 @@ public class KafkaIndexTaskTest null, false ), - null, true ); @@ -1464,13 +1501,22 @@ public class KafkaIndexTaskTest private KafkaIndexTask createTask( final String taskId, final KafkaIOConfig ioConfig, - final Integer maxRowsPerSegment, + final Boolean resetOffsetAutomatically + ) + { + return createTask(taskId, DATA_SCHEMA, ioConfig, resetOffsetAutomatically); + } + + private KafkaIndexTask createTask( + final String taskId, + final DataSchema dataSchema, + final KafkaIOConfig ioConfig, final Boolean resetOffsetAutomatically ) { final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( 1000, - maxRowsPerSegment, + null, new Period("P1Y"), null, null, @@ -1483,7 +1529,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = new KafkaIndexTask( taskId, null, - cloneDataSchema(), + cloneDataSchema(dataSchema), tuningConfig, ioConfig, null, @@ -1494,13 +1540,14 @@ public class KafkaIndexTaskTest return task; } - private static DataSchema cloneDataSchema() + private static DataSchema cloneDataSchema(final DataSchema dataSchema) { return new DataSchema( - DATA_SCHEMA.getDataSource(), - DATA_SCHEMA.getParserMap(), - DATA_SCHEMA.getAggregators(), - DATA_SCHEMA.getGranularitySpec(), + dataSchema.getDataSource(), + dataSchema.getParserMap(), + dataSchema.getAggregators(), + dataSchema.getGranularitySpec(), + dataSchema.getTransformSpec(), objectMapper ); } @@ -1696,7 +1743,7 @@ public class KafkaIndexTaskTest return new File(directory, "segments"); } - private List readSegmentDim1(final SegmentDescriptor descriptor) throws IOException + private List readSegmentColumn(final String column, final SegmentDescriptor descriptor) throws IOException { File indexZip = new File( StringUtils.format( @@ -1728,11 +1775,11 @@ public class KafkaIndexTaskTest ); IndexIO indexIO = new TestUtils().getTestIndexIO(); QueryableIndex index = indexIO.loadIndex(outputLocation); - DictionaryEncodedColumn dim1 = index.getColumn("dim1").getDictionaryEncoding(); + DictionaryEncodedColumn theColumn = index.getColumn(column).getDictionaryEncoding(); List values = Lists.newArrayList(); - for (int i = 0; i < dim1.length(); i++) { - int id = dim1.getSingleValueRow(i); - String value = dim1.lookupName(id); + for (int i = 0; i < theColumn.length(); i++) { + int id = theColumn.getSingleValueRow(i); + String value = theColumn.lookupName(id); values.add(value); } return values; diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 96244ca559e..3fbfc11f726 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1895,6 +1895,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Granularities.NONE, ImmutableList.of() ), + null, objectMapper ); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index d50549c1ee9..a90bd38ac24 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -30,6 +30,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; +import javax.annotation.Nullable; import java.io.IOException; public abstract class HadoopDruidIndexerMapper extends Mapper @@ -75,10 +76,15 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< if (reportParseExceptions) { throw e; } - log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); + log.debug(e, "Ignoring invalid row [%s] due to parsing error", value); context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); return; // we're ignoring this invalid row + } + if (inputRow == null) { + // Throw away null rows from the parser. + log.debug("Throwing away row [%s]", value); + return; } if (!granularitySpec.bucketIntervals().isPresent() @@ -92,7 +98,8 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< } } - public final static InputRow parseInputRow(Object value, InputRowParser parser) + @Nullable + public static InputRow parseInputRow(Object value, InputRowParser parser) { if (parser instanceof StringInputRowParser && value instanceof Text) { //Note: This is to ensure backward compatibility with 0.7.0 and before @@ -101,6 +108,9 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< return ((StringInputRowParser) parser).parse(value.toString()); } else if (value instanceof InputRow) { return (InputRow) value; + } else if (value == null) { + // Pass through nulls so they get thrown away. + return null; } else { return parser.parse(value); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java index 87e671f5878..b55bdf4aa7c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java @@ -26,10 +26,12 @@ import com.google.common.collect.ImmutableList; import io.druid.java.util.common.JodaUtils; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.filter.DimFilter; +import io.druid.segment.indexing.TransformSpec; import io.druid.timeline.DataSegment; import org.joda.time.Interval; import java.util.List; +import java.util.Objects; public class DatasourceIngestionSpec { @@ -41,6 +43,10 @@ public class DatasourceIngestionSpec private final List metrics; private final boolean ignoreWhenNoSegments; + // Note that the only purpose of the transformSpec field is to hold the value from the overall dataSchema. + // It is not meant to be provided by end users, and will be overwritten. + private final TransformSpec transformSpec; + @JsonCreator public DatasourceIngestionSpec( @JsonProperty("dataSource") String dataSource, @@ -50,7 +56,8 @@ public class DatasourceIngestionSpec @JsonProperty("filter") DimFilter filter, @JsonProperty("dimensions") List dimensions, @JsonProperty("metrics") List metrics, - @JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments + @JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments, + @JsonProperty("transformSpec") TransformSpec transformSpec ) { this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource"); @@ -59,7 +66,7 @@ public class DatasourceIngestionSpec interval == null || intervals == null, "please specify intervals only" ); - + List theIntervals = null; if (interval != null) { theIntervals = ImmutableList.of(interval); @@ -78,6 +85,7 @@ public class DatasourceIngestionSpec this.metrics = metrics; this.ignoreWhenNoSegments = ignoreWhenNoSegments; + this.transformSpec = transformSpec != null ? transformSpec : TransformSpec.NONE; } @JsonProperty @@ -122,6 +130,12 @@ public class DatasourceIngestionSpec return ignoreWhenNoSegments; } + @JsonProperty + public TransformSpec getTransformSpec() + { + return transformSpec; + } + public DatasourceIngestionSpec withDimensions(List dimensions) { return new DatasourceIngestionSpec( @@ -132,7 +146,8 @@ public class DatasourceIngestionSpec filter, dimensions, metrics, - ignoreWhenNoSegments + ignoreWhenNoSegments, + transformSpec ); } @@ -146,7 +161,8 @@ public class DatasourceIngestionSpec filter, dimensions, metrics, - ignoreWhenNoSegments + ignoreWhenNoSegments, + transformSpec ); } @@ -160,7 +176,8 @@ public class DatasourceIngestionSpec filter, dimensions, metrics, - ignoreWhenNoSegments + ignoreWhenNoSegments, + transformSpec ); } @@ -174,12 +191,28 @@ public class DatasourceIngestionSpec filter, dimensions, metrics, - ignoreWhenNoSegments + ignoreWhenNoSegments, + transformSpec + ); + } + + public DatasourceIngestionSpec withTransformSpec(TransformSpec transformSpec) + { + return new DatasourceIngestionSpec( + dataSource, + null, + intervals, + segments, + filter, + dimensions, + metrics, + ignoreWhenNoSegments, + transformSpec ); } @Override - public boolean equals(Object o) + public boolean equals(final Object o) { if (this == o) { return true; @@ -187,42 +220,30 @@ public class DatasourceIngestionSpec if (o == null || getClass() != o.getClass()) { return false; } - - DatasourceIngestionSpec that = (DatasourceIngestionSpec) o; - - if (ignoreWhenNoSegments != that.ignoreWhenNoSegments) { - return false; - } - if (!dataSource.equals(that.dataSource)) { - return false; - } - if (!intervals.equals(that.intervals)) { - return false; - } - if (segments != null ? !segments.equals(that.segments) : that.segments != null) { - return false; - } - if (filter != null ? !filter.equals(that.filter) : that.filter != null) { - return false; - } - if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { - return false; - } - return !(metrics != null ? !metrics.equals(that.metrics) : that.metrics != null); - + final DatasourceIngestionSpec that = (DatasourceIngestionSpec) o; + return ignoreWhenNoSegments == that.ignoreWhenNoSegments && + Objects.equals(dataSource, that.dataSource) && + Objects.equals(intervals, that.intervals) && + Objects.equals(segments, that.segments) && + Objects.equals(filter, that.filter) && + Objects.equals(dimensions, that.dimensions) && + Objects.equals(metrics, that.metrics) && + Objects.equals(transformSpec, that.transformSpec); } @Override public int hashCode() { - int result = dataSource.hashCode(); - result = 31 * result + intervals.hashCode(); - result = 31 * result + (segments != null ? segments.hashCode() : 0); - result = 31 * result + (filter != null ? filter.hashCode() : 0); - result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); - result = 31 * result + (metrics != null ? metrics.hashCode() : 0); - result = 31 * result + (ignoreWhenNoSegments ? 1 : 0); - return result; + return Objects.hash( + dataSource, + intervals, + segments, + filter, + dimensions, + metrics, + ignoreWhenNoSegments, + transformSpec + ); } @Override @@ -236,6 +257,7 @@ public class DatasourceIngestionSpec ", dimensions=" + dimensions + ", metrics=" + metrics + ", ignoreWhenNoSegments=" + ignoreWhenNoSegments + + ", transformSpec=" + transformSpec + '}'; } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java index c703b0380f2..b4856a925fc 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java @@ -24,13 +24,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Lists; - import io.druid.data.input.InputRow; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.JobHelper; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -61,6 +59,7 @@ public class DatasourceInputFormat extends InputFormat public static final String CONF_INPUT_SEGMENTS = "druid.segments"; public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema"; + public static final String CONF_TRANSFORM_SPEC = "druid.datasource.transformSpec"; public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size"; @Override diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java index be6eb64a268..7f364a36782 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java @@ -27,8 +27,7 @@ import com.google.common.collect.Lists; import com.google.common.io.Closeables; import com.google.common.io.Files; import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedInputRow; -import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.JobHelper; import io.druid.java.util.common.ISE; @@ -57,7 +56,7 @@ public class DatasourceRecordReader extends RecordReader private IngestSegmentFirehose firehose; private int rowNum; - private MapBasedRow currRow; + private Row currRow; private List indexes = Lists.newArrayList(); private List tmpSegmentDirs = Lists.newArrayList(); @@ -108,18 +107,18 @@ public class DatasourceRecordReader extends RecordReader firehose = new IngestSegmentFirehose( adapters, + spec.getTransformSpec(), spec.getDimensions(), spec.getMetrics(), spec.getFilter() ); - } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (firehose.hasMore()) { - currRow = (MapBasedRow) firehose.nextRow(); + currRow = firehose.nextRow(); rowNum++; return true; } else { @@ -136,13 +135,7 @@ public class DatasourceRecordReader extends RecordReader @Override public InputRow getCurrentValue() throws IOException, InterruptedException { - return new SegmentInputRow( - new MapBasedInputRow( - currRow.getTimestamp(), - spec.getDimensions(), - currRow.getEvent() - ) - ); + return currRow == null ? null : new SegmentInputRow(currRow, spec.getDimensions()); } @Override diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java index 7371ade58b4..cf077eb32b2 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java @@ -33,17 +33,19 @@ import java.util.List; */ public class SegmentInputRow implements InputRow { - private final InputRow delegate; + private final Row delegate; + private final List dimensions; - public SegmentInputRow(InputRow delegate) + public SegmentInputRow(Row delegate, List dimensions) { this.delegate = delegate; + this.dimensions = dimensions; } @Override public List getDimensions() { - return delegate.getDimensions(); + return dimensions; } @Override @@ -82,11 +84,6 @@ public class SegmentInputRow implements InputRow return delegate.compareTo(row); } - public InputRow getDelegate() - { - return delegate; - } - @Override public String toString() { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java index 15711eabb83..e89ca3c0f0f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -28,7 +28,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.hadoop.DatasourceIngestionSpec; import io.druid.indexer.hadoop.DatasourceInputFormat; @@ -158,6 +157,11 @@ public class DatasourcePathSpec implements PathSpec updatedIngestionSpec = updatedIngestionSpec.withQueryGranularity(config.getGranularitySpec().getQueryGranularity()); + // propagate in the transformSpec from the overall job config + updatedIngestionSpec = updatedIngestionSpec.withTransformSpec( + config.getSchema().getDataSchema().getTransformSpec() + ); + job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(updatedIngestionSpec)); job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments)); job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize)); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 1060407da63..e1107ff06c7 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -47,6 +47,7 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexStorageAdapter; import io.druid.segment.StorageAdapter; import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.TransformSpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.realtime.firehose.IngestSegmentFirehose; @@ -322,6 +323,7 @@ public class BatchDeltaIngestionTest Firehose firehose = new IngestSegmentFirehose( ImmutableList.of(new WindowedStorageAdapter(adapter, windowedDataSegment.getInterval())), + TransformSpec.NONE, ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), null @@ -363,6 +365,7 @@ public class BatchDeltaIngestionTest new UniformGranularitySpec( Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL) ), + null, MAPPER ), new HadoopIOConfig( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 5ff2afdcd5d..8150bc6f3a1 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -181,6 +181,7 @@ public class DetermineHashedPartitionsJobTest Granularities.NONE, intervals ), + null, HadoopDruidIndexerConfig.JSON_MAPPER ), new HadoopIOConfig( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index af7dfba194c..0496cb3d7f5 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -239,6 +239,7 @@ public class DeterminePartitionsJobTest new UniformGranularitySpec( Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of(interval)) ), + null, HadoopDruidIndexerConfig.JSON_MAPPER ), new HadoopIOConfig( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index f91f9b6d5de..2b134c2aa36 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -73,6 +73,7 @@ public class HadoopDruidIndexerConfigTest Granularities.MINUTE, ImmutableList.of(Intervals.of("2010-01-01/P1D")) ), + null, jsonMapper ), new HadoopIOConfig(ImmutableMap.of("paths", "bar", "type", "static"), null, null), @@ -136,6 +137,7 @@ public class HadoopDruidIndexerConfigTest Granularities.MINUTE, ImmutableList.of(Intervals.of("2010-01-01/P1D")) ), + null, jsonMapper ), new HadoopIOConfig(ImmutableMap.of("paths", "bar", "type", "static"), null, null), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerMapperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerMapperTest.java new file mode 100644 index 00000000000..cf33fb83e19 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerMapperTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.indexer.path.StaticPathSpec; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.jackson.JacksonUtils; +import io.druid.java.util.common.parsers.JSONPathSpec; +import io.druid.math.expr.ExprMacroTable; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.segment.TestHelper; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.ExpressionTransform; +import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class HadoopDruidIndexerMapperTest +{ + private static final ObjectMapper JSON_MAPPER = TestHelper.getJsonMapper(); + private static final DataSchema DATA_SCHEMA = new DataSchema( + "test_ds", + JSON_MAPPER.convertValue( + new HadoopyStringInputRowParser( + new JSONParseSpec( + new TimestampSpec("t", "auto", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim1t", "dim2")), + null, + null + ), + new JSONPathSpec(true, ImmutableList.of()), + ImmutableMap.of() + ) + ), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ), + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + null, + JSON_MAPPER + ); + + private static final HadoopIOConfig IO_CONFIG = new HadoopIOConfig( + JSON_MAPPER.convertValue( + new StaticPathSpec("dummyPath", null), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ), + null, + "dummyOutputPath" + ); + + private static final HadoopTuningConfig TUNING_CONFIG = HadoopTuningConfig + .makeDefaultTuningConfig() + .withWorkingPath("dummyWorkingPath"); + + @Test + public void testHadoopyStringParser() throws Exception + { + final HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec(DATA_SCHEMA, IO_CONFIG, TUNING_CONFIG) + ); + + final MyMapper mapper = new MyMapper(); + final Configuration hadoopConfig = new Configuration(); + hadoopConfig.set( + HadoopDruidIndexerConfig.CONFIG_PROPERTY, + HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(config) + ); + final Mapper.Context mapContext = EasyMock.mock(Mapper.Context.class); + EasyMock.expect(mapContext.getConfiguration()).andReturn(hadoopConfig).once(); + EasyMock.replay(mapContext); + mapper.setup(mapContext); + final List> rows = ImmutableList.of( + ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "x", "m1", 1.0), + ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim2", "y", "m1", 1.0) + ); + for (Map row : rows) { + mapper.map(NullWritable.get(), new Text(JSON_MAPPER.writeValueAsString(row)), mapContext); + } + assertRowListEquals(rows, mapper.getRows()); + } + + @Test + public void testHadoopyStringParserWithTransformSpec() throws Exception + { + final HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + DATA_SCHEMA.withTransformSpec( + new TransformSpec( + new SelectorDimFilter("dim1", "foo", null), + ImmutableList.of( + new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil()) + ) + ) + ), + IO_CONFIG, + TUNING_CONFIG + ) + ); + + final MyMapper mapper = new MyMapper(); + final Configuration hadoopConfig = new Configuration(); + hadoopConfig.set( + HadoopDruidIndexerConfig.CONFIG_PROPERTY, + HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(config) + ); + final Mapper.Context mapContext = EasyMock.mock(Mapper.Context.class); + EasyMock.expect(mapContext.getConfiguration()).andReturn(hadoopConfig).once(); + EasyMock.replay(mapContext); + mapper.setup(mapContext); + final List> rows = ImmutableList.of( + ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "foo", "dim2", "x", "m1", 1.0), + ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "bar", "dim2", "y", "m1", 1.0), + ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "foo", "dim2", "z", "m1", 1.0) + ); + for (Map row : rows) { + mapper.map(NullWritable.get(), new Text(JSON_MAPPER.writeValueAsString(row)), mapContext); + } + assertRowListEquals( + ImmutableList.of( + ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "foo", "dim1t", "foofoo", "dim2", "x", "m1", 1.0), + ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "foo", "dim1t", "foofoo", "dim2", "z", "m1", 1.0) + ), + mapper.getRows() + ); + } + + private static void assertRowListEquals(final List> expected, final List actual) + { + Assert.assertEquals( + expected, + actual.stream().map(HadoopDruidIndexerMapperTest::rowToMap).collect(Collectors.toList()) + ); + } + + private static Map rowToMap(final InputRow row) + { + // Normalize input row for the purposes of testing. + final ImmutableMap.Builder builder = ImmutableMap.builder() + .put("t", row.getTimestamp().toString()); + + for (String dim : row.getDimensions()) { + final Object val = row.getRaw(dim); + if (val != null) { + builder.put(dim, val); + } + } + + // other, non-dimension fields are not self describing so much be specified individually + builder.put("m1", row.getRaw("m1")); + return builder.build(); + } + + public static class MyMapper extends HadoopDruidIndexerMapper + { + private final List rows = new ArrayList<>(); + + @Override + protected void innerMap( + final InputRow inputRow, + final Object value, + final Context context, + final boolean reportParseExceptions + ) throws IOException, InterruptedException + { + rows.add(inputRow); + } + + public List getRows() + { + return rows; + } + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java index 6899ccb3954..e9d92e55c0e 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java @@ -93,7 +93,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest PathSpec pathSpec = new DatasourcePathSpec( jsonMapper, null, - new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false), + new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false, null), null ); HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( @@ -120,7 +120,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest null, null, null, - false + false, + null ), null ); @@ -148,7 +149,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest null, null, null, - false + false, + null ), null ); @@ -173,7 +175,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest null, null, null, - false + false, + null ), null ); @@ -204,7 +207,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest null, null, null, - false + false, + null ), null ) @@ -236,6 +240,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest null, ImmutableList.of(Intervals.of("2010-01-01/P1D")) ), + null, jsonMapper ), new HadoopIOConfig( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java index f9ddd428e09..9eaf75349ed 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java @@ -82,6 +82,7 @@ public class IndexGeneratorCombinerTest new UniformGranularitySpec( Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of("2010/2011")) ), + null, HadoopDruidIndexerConfig.JSON_MAPPER ), new HadoopIOConfig( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index f70d7f1e1e3..8ed853bef99 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -496,6 +496,7 @@ public class IndexGeneratorJobTest new UniformGranularitySpec( Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval) ), + null, mapper ), new HadoopIOConfig( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 5f3c15781c8..f292cfb9608 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -89,6 +89,7 @@ public class JobHelperTest new UniformGranularitySpec( Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval) ), + null, HadoopDruidIndexerConfig.JSON_MAPPER ), new HadoopIOConfig( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java index adbeeea4486..cee6f683322 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java @@ -51,7 +51,8 @@ public class DatasourceIngestionSpecTest new SelectorDimFilter("dim", "value", null), Lists.newArrayList("d1", "d2"), Lists.newArrayList("m1", "m2", "m3"), - false + false, + null ); DatasourceIngestionSpec actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), DatasourceIngestionSpec.class); @@ -85,7 +86,8 @@ public class DatasourceIngestionSpecTest null, null, null, - false + false, + null ); Assert.assertEquals(expected, actual); @@ -133,7 +135,8 @@ public class DatasourceIngestionSpecTest new SelectorDimFilter("dim", "value", null), Lists.newArrayList("d1", "d2"), Lists.newArrayList("m1", "m2", "m3"), - true + true, + null ); actual = MAPPER.readValue( @@ -153,7 +156,7 @@ public class DatasourceIngestionSpecTest DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class); Assert.assertEquals( - new DatasourceIngestionSpec("test", Intervals.of("2014/2015"), null, null, null, null, null, false), + new DatasourceIngestionSpec("test", Intervals.of("2014/2015"), null, null, null, null, null, false, null), actual ); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java index 13a25d1cd1c..237ecfba03f 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java @@ -69,7 +69,8 @@ public class DatasourceRecordReaderTest null, segment.getDimensions(), segment.getMetrics(), - false + false, + null ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index 466b11b8ba9..ab1b0780a9e 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -79,7 +79,8 @@ public class DatasourcePathSpecTest null, null, null, - false + false, + null ); segments = ImmutableList.of( @@ -279,6 +280,7 @@ public class DatasourcePathSpecTest new UniformGranularitySpec( Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of("2000/3000")) ), + null, HadoopDruidIndexerConfig.JSON_MAPPER ), new HadoopIOConfig( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index 9e3a0447530..3aec576f4f5 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -153,6 +153,7 @@ public class GranularityPathSpecTest Granularities.MINUTE, ImmutableList.of(Intervals.of("2015-11-06T00:00Z/2015-11-07T00:00Z")) ), + null, jsonMapper ), new HadoopIOConfig(null, null, null), @@ -204,6 +205,7 @@ public class GranularityPathSpecTest Granularities.ALL, ImmutableList.of(Intervals.of("2015-01-01T11Z/2015-01-02T05Z")) ), + null, jsonMapper ), new HadoopIOConfig(null, null, null), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java index ae75d6637db..59b8ed0e210 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java @@ -54,7 +54,7 @@ public class StaticPathSpecTest Job job = new Job(); StaticPathSpec pathSpec = new StaticPathSpec("/a/c,/a/b/{c,d}", null); - DataSchema schema = new DataSchema("ds", null, new AggregatorFactory[0], null, jsonMapper); + DataSchema schema = new DataSchema("ds", null, new AggregatorFactory[0], null, null, jsonMapper); HadoopIOConfig io = new HadoopIOConfig(null, null, null); pathSpec.addInputPaths(new HadoopDruidIndexerConfig(new HadoopIngestionSpec(schema, io, null)), job); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 61350ac9a88..2b7766f7f72 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -182,6 +182,7 @@ public class HadoopConverterJobTest Granularities.DAY, ImmutableList.of(interval) ), + null, HadoopDruidIndexerConfig.JSON_MAPPER ), new HadoopIOConfig( diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index d4be01831cc..861083b75f5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -659,6 +659,7 @@ public class IndexTask extends AbstractTask final InputRow inputRow = firehose.nextRow(); if (inputRow == null) { + fireDepartmentMetrics.incrementThrownAway(); continue; } @@ -668,7 +669,6 @@ public class IndexTask extends AbstractTask continue; } - final String sequenceName; if (isGuaranteedRollup) { diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 231c8b19ebd..18427125f9e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -42,6 +42,7 @@ import io.druid.java.util.common.parsers.ParseException; import io.druid.query.filter.DimFilter; import io.druid.segment.IndexIO; import io.druid.segment.QueryableIndexStorageAdapter; +import io.druid.segment.indexing.TransformSpec; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.segment.realtime.firehose.WindowedStorageAdapter; @@ -281,7 +282,8 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 23df0a6575e..e0606a8f851 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -20,6 +20,7 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.Files; @@ -48,12 +49,16 @@ import io.druid.java.util.common.Intervals; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.parsers.ParseException; +import io.druid.math.expr.ExprMacroTable; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.filter.SelectorDimFilter; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.ExpressionTransform; +import io.druid.segment.indexing.TransformSpec; import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; @@ -209,6 +214,50 @@ public class IndexTaskTest Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); } + @Test + public void testTransformSpec() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,b,1\n"); + writer.write("2014-01-01T02:00:30Z,c,1\n"); + } + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + null, + new TransformSpec( + new SelectorDimFilter("dim", "b", null), + ImmutableList.of( + new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()) + ) + ), + null, + createTuningConfig(2, null, true, false), + false + ), + null + ); + + Assert.assertEquals(indexTask.getId(), indexTask.getGroupId()); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); + Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); + Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); + } + @Test public void testWithArbitraryGranularity() throws Exception { @@ -1012,6 +1061,18 @@ public class IndexTaskTest IndexTuningConfig tuningConfig, boolean appendToExisting ) + { + return createIngestionSpec(baseDir, parseSpec, TransformSpec.NONE, granularitySpec, tuningConfig, appendToExisting); + } + + private IndexTask.IndexIngestionSpec createIngestionSpec( + File baseDir, + ParseSpec parseSpec, + TransformSpec transformSpec, + GranularitySpec granularitySpec, + IndexTuningConfig tuningConfig, + boolean appendToExisting + ) { return new IndexTask.IndexIngestionSpec( new DataSchema( @@ -1031,6 +1092,7 @@ public class IndexTaskTest Granularities.MINUTE, Arrays.asList(Intervals.of("2014/2015")) ), + transformSpec, jsonMapper ), new IndexTask.IndexIOConfig( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index f236274939f..a5b3c95a1df 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -30,21 +30,20 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -//CHECKSTYLE.OFF: Regexp -import com.metamx.common.logger.Logger; -//CHECKSTYLE.ON: Regexp import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.core.LoggingEmitter; +import com.metamx.emitter.core.NoopEmitter; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; -import io.druid.java.util.common.concurrent.Execs; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.TimeAndDimsParseSpec; +import io.druid.data.input.impl.TimestampSpec; import io.druid.discovery.DataNodeService; import io.druid.discovery.DruidNodeAnnouncer; import io.druid.discovery.LookupNodeService; @@ -72,9 +71,13 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.jackson.JacksonUtils; +import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; +import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.EntryExistsException; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Druids; @@ -91,14 +94,19 @@ import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.filter.DimFilter; +import io.druid.query.filter.SelectorDimFilter; import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQueryEngine; import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.TestHelper; import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.ExpressionTransform; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TransformSpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; @@ -130,6 +138,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -140,25 +149,26 @@ import java.util.concurrent.Executor; public class RealtimeIndexTaskTest { private static final Logger log = new Logger(RealtimeIndexTaskTest.class); - private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private static final ServiceEmitter emitter = new ServiceEmitter( "service", "host", - new LoggingEmitter( - log, - LoggingEmitter.Level.ERROR, - jsonMapper - ) + new NoopEmitter() ); private static final String FAIL_DIM = "__fail__"; private static class TestFirehose implements Firehose { - private final List queue = Lists.newLinkedList(); + private final InputRowParser> parser; + private final List> queue = new LinkedList<>(); private boolean closed = false; - public void addRows(List rows) + public TestFirehose(final InputRowParser> parser) + { + this.parser = parser; + } + + public void addRows(List> rows) { synchronized (this) { queue.addAll(rows); @@ -187,8 +197,8 @@ public class RealtimeIndexTaskTest public InputRow nextRow() { synchronized (this) { - final InputRow row = queue.remove(0); - if (row != null && row.getDimensions().contains(FAIL_DIM)) { + final InputRow row = parser.parse(queue.remove(0)); + if (row != null && row.getRaw(FAIL_DIM) != null) { throw new ParseException(FAIL_DIM); } return row; @@ -198,14 +208,7 @@ public class RealtimeIndexTaskTest @Override public Runnable commit() { - return new Runnable() - { - @Override - public void run() - { - // do nothing - } - }; + return () -> {}; } @Override @@ -218,16 +221,17 @@ public class RealtimeIndexTaskTest } } - private static class TestFirehoseFactory implements FirehoseFactory + private static class TestFirehoseFactory implements FirehoseFactory { public TestFirehoseFactory() { } @Override + @SuppressWarnings("unchecked") public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException, ParseException { - return new TestFirehose(); + return new TestFirehose(parser); } } @@ -277,7 +281,7 @@ public class RealtimeIndexTaskTest public void testHandoffTimeout() throws Exception { final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); - final RealtimeIndexTask task = makeRealtimeTask(null, true, 100L); + final RealtimeIndexTask task = makeRealtimeTask(null, TransformSpec.NONE, true, 100L); final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); @@ -289,12 +293,8 @@ public class RealtimeIndexTaskTest final TestFirehose firehose = (TestFirehose) task.getFirehose(); firehose.addRows( - ImmutableList.of( - new MapBasedInputRow( - now, - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo", "met1", "1") - ) + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1") ) ); @@ -331,22 +331,10 @@ public class RealtimeIndexTaskTest final TestFirehose firehose = (TestFirehose) task.getFirehose(); firehose.addRows( - ImmutableList.of( - new MapBasedInputRow( - now, - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo", "met1", "1") - ), - new MapBasedInputRow( - now.minus(new Period("P1D")), - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo", "met1", 2.0) - ), - new MapBasedInputRow( - now, - ImmutableList.of("dim2"), - ImmutableMap.of("dim2", "bar", "met1", 2.0) - ) + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), + ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", 2.0), + ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) ) ); @@ -366,8 +354,79 @@ public class RealtimeIndexTaskTest Assert.assertEquals(0, task.getMetrics().unparseable()); // Do some queries. - Assert.assertEquals(2, sumMetric(task, "rows")); - Assert.assertEquals(3, sumMetric(task, "met1")); + Assert.assertEquals(2, sumMetric(task, null, "rows")); + Assert.assertEquals(3, sumMetric(task, null, "met1")); + + // Simulate handoff. + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); + } + + @Test(timeout = 60_000L) + public void testTransformSpec() throws Exception + { + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final TransformSpec transformSpec = new TransformSpec( + new SelectorDimFilter("dim1", "foo", null), + ImmutableList.of( + new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil()) + ) + ); + final RealtimeIndexTask task = makeRealtimeTask(null, transformSpec, true, 0); + final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final ListenableFuture statusFuture = runTask(task, taskToolbox); + final DataSegment publishedSegment; + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), + ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", 2.0), + ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + while (mdc.getPublished().isEmpty()) { + Thread.sleep(50); + } + + publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + + // Check metrics. + Assert.assertEquals(1, task.getMetrics().processed()); + Assert.assertEquals(2, task.getMetrics().thrownAway()); + Assert.assertEquals(0, task.getMetrics().unparseable()); + + // Do some queries. + Assert.assertEquals(1, sumMetric(task, null, "rows")); + Assert.assertEquals(1, sumMetric(task, new SelectorDimFilter("dim1t", "foofoo", null), "rows")); + Assert.assertEquals(0, sumMetric(task, new SelectorDimFilter("dim1t", "barbar", null), "rows")); + Assert.assertEquals(1, sumMetric(task, null, "met1")); // Simulate handoff. for (Map.Entry> entry : handOffCallbacks.entrySet()) { @@ -405,27 +464,11 @@ public class RealtimeIndexTaskTest final TestFirehose firehose = (TestFirehose) task.getFirehose(); firehose.addRows( - ImmutableList.of( - new MapBasedInputRow( - now, - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo", "met1", "1") - ), - new MapBasedInputRow( - now, - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo", "met1", "foo") - ), - new MapBasedInputRow( - now.minus(new Period("P1D")), - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo", "met1", "foo") - ), - new MapBasedInputRow( - now, - ImmutableList.of("dim2"), - ImmutableMap.of("dim2", "bar", "met1", 2.0) - ) + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"), + ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", "foo"), + ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) ) ); @@ -446,7 +489,7 @@ public class RealtimeIndexTaskTest CoreMatchers.allOf( CoreMatchers.instanceOf(ParseException.class), ThrowableMessageMatcher.hasMessage( - CoreMatchers.containsString("Unable to parse metrics[met1], value[foo]") + CoreMatchers.containsString("Unable to parse value[foo] for field[met1]") ) ) ) @@ -472,39 +515,24 @@ public class RealtimeIndexTaskTest final TestFirehose firehose = (TestFirehose) task.getFirehose(); firehose.addRows( - Arrays.asList( + Arrays.asList( // Good row- will be processed. - new MapBasedInputRow( - now, - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo", "met1", "1") - ), - // Null row- will be unparseable. + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), + + // Null row- will be thrown away. null, + // Bad metric- will count as processed, but that particular metric won't update. - new MapBasedInputRow( - now, - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo", "met1", "foo") - ), + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"), + // Bad row- will be unparseable. - new MapBasedInputRow( - now, - ImmutableList.of("dim1", FAIL_DIM), - ImmutableMap.of("dim1", "foo", "met1", 2.0) - ), + ImmutableMap.of("dim1", "foo", "met1", 2.0, FAIL_DIM, "x"), + // Old row- will be thrownAway. - new MapBasedInputRow( - now.minus(new Period("P1D")), - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo", "met1", 2.0) - ), + ImmutableMap.of("t", now.minus(Period.days(1)).getMillis(), "dim1", "foo", "met1", 2.0), + // Good row- will be processed. - new MapBasedInputRow( - now, - ImmutableList.of("dim2"), - ImmutableMap.of("dim2", "bar", "met1", 2.0) - ) + ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) ) ); @@ -524,8 +552,8 @@ public class RealtimeIndexTaskTest Assert.assertEquals(2, task.getMetrics().unparseable()); // Do some queries. - Assert.assertEquals(3, sumMetric(task, "rows")); - Assert.assertEquals(3, sumMetric(task, "met1")); + Assert.assertEquals(3, sumMetric(task, null, "rows")); + Assert.assertEquals(3, sumMetric(task, null, "met1")); // Simulate handoff. for (Map.Entry> entry : handOffCallbacks.entrySet()) { @@ -568,12 +596,8 @@ public class RealtimeIndexTaskTest final TestFirehose firehose = (TestFirehose) task1.getFirehose(); firehose.addRows( - ImmutableList.of( - new MapBasedInputRow( - now, - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo") - ) + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo") ) ); @@ -601,17 +625,13 @@ public class RealtimeIndexTaskTest } // Do a query, at this point the previous data should be loaded. - Assert.assertEquals(1, sumMetric(task2, "rows")); + Assert.assertEquals(1, sumMetric(task2, null, "rows")); final TestFirehose firehose = (TestFirehose) task2.getFirehose(); firehose.addRows( - ImmutableList.of( - new MapBasedInputRow( - now, - ImmutableList.of("dim2"), - ImmutableMap.of("dim2", "bar") - ) + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim2", "bar") ) ); @@ -626,7 +646,7 @@ public class RealtimeIndexTaskTest publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); // Do a query. - Assert.assertEquals(2, sumMetric(task2, "rows")); + Assert.assertEquals(2, sumMetric(task2, null, "rows")); // Simulate handoff. for (Map.Entry> entry : handOffCallbacks.entrySet()) { @@ -671,12 +691,8 @@ public class RealtimeIndexTaskTest final TestFirehose firehose = (TestFirehose) task1.getFirehose(); firehose.addRows( - ImmutableList.of( - new MapBasedInputRow( - now, - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo") - ) + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo") ) ); @@ -691,7 +707,7 @@ public class RealtimeIndexTaskTest publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); // Do a query. - Assert.assertEquals(1, sumMetric(task1, "rows")); + Assert.assertEquals(1, sumMetric(task1, null, "rows")); // Trigger graceful shutdown. task1.stopGracefully(); @@ -768,12 +784,8 @@ public class RealtimeIndexTaskTest final TestFirehose firehose = (TestFirehose) task1.getFirehose(); firehose.addRows( - ImmutableList.of( - new MapBasedInputRow( - now, - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo") - ) + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo") ) ); @@ -863,22 +875,40 @@ public class RealtimeIndexTaskTest private RealtimeIndexTask makeRealtimeTask(final String taskId) { - return makeRealtimeTask(taskId, true, 0); + return makeRealtimeTask(taskId, TransformSpec.NONE, true, 0); } private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions) { - return makeRealtimeTask(taskId, reportParseExceptions, 0); + return makeRealtimeTask(taskId, TransformSpec.NONE, reportParseExceptions, 0); } - private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions, long handoffTimeout) + private RealtimeIndexTask makeRealtimeTask( + final String taskId, + final TransformSpec transformSpec, + final boolean reportParseExceptions, + final long handoffTimeout + ) { ObjectMapper objectMapper = new DefaultObjectMapper(); DataSchema dataSchema = new DataSchema( "test_ds", - null, + TestHelper.getJsonMapper().convertValue( + new MapInputRowParser( + new TimeAndDimsParseSpec( + new TimestampSpec("t", "auto", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim1t")), + null, + null + ) + ) + ), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ), new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + transformSpec, objectMapper ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( @@ -1065,11 +1095,12 @@ public class RealtimeIndexTaskTest return toolboxFactory.build(task); } - public long sumMetric(final Task task, final String metric) throws Exception + public long sumMetric(final Task task, final DimFilter filter, final String metric) throws Exception { // Do a query. TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource("test_ds") + .filters(filter) .aggregators( ImmutableList.of( new LongSumAggregatorFactory(metric, metric) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 80bf2947670..83cdb0887c3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -185,6 +185,7 @@ public class TaskSerdeTest null, ImmutableList.of(Intervals.of("2010-01-01/P2D")) ), + null, jsonMapper ), new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), @@ -247,6 +248,7 @@ public class TaskSerdeTest null, ImmutableList.of(Intervals.of("2010-01-01/P2D")) ), + null, jsonMapper ), new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), @@ -469,6 +471,7 @@ public class TaskSerdeTest null, new AggregatorFactory[0], new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), + null, jsonMapper ), new RealtimeIOConfig( @@ -760,6 +763,7 @@ public class TaskSerdeTest null, ImmutableList.of(Intervals.of("2010-01-01/P1D")) ), + null, jsonMapper ), new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), null ), diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index f935e94be14..bdda4aae5ca 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -33,14 +33,13 @@ import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Module; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.java.util.common.Intervals; -import io.druid.java.util.common.JodaUtils; import io.druid.data.input.InputRow; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.SpatialDimensionSchema; +import io.druid.data.input.impl.TimeAndDimsParseSpec; import io.druid.data.input.impl.TimestampSpec; import io.druid.guice.GuiceAnnotationIntrospector; import io.druid.guice.GuiceInjectableValues; @@ -56,8 +55,11 @@ import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.IOE; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.JodaUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; +import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -65,8 +67,12 @@ import io.druid.query.filter.SelectorDimFilter; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; +import io.druid.segment.TestHelper; +import io.druid.segment.column.Column; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.indexing.ExpressionTransform; +import io.druid.segment.indexing.TransformSpec; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; @@ -115,7 +121,7 @@ public class IngestSegmentFirehoseFactoryTest static { TestUtils testUtils = new TestUtils(); - MAPPER = setupInjectablesInObjectMapper(testUtils.getTestObjectMapper()); + MAPPER = setupInjectablesInObjectMapper(TestHelper.getJsonMapper()); INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9(); INDEX_IO = testUtils.getTestIndexIO(); } @@ -394,7 +400,9 @@ public class IngestSegmentFirehoseFactoryTest ) { this.factory = factory; - this.rowParser = rowParser; + + // Must decorate the parser, since IngestSegmentFirehoseFactory will undecorate it. + this.rowParser = TransformSpec.NONE.decorate(rowParser); } private static final Logger log = new Logger(IngestSegmentFirehoseFactoryTest.class); @@ -420,15 +428,13 @@ public class IngestSegmentFirehoseFactoryTest private final InputRowParser rowParser; private static final InputRowParser> ROW_PARSER = new MapInputRowParser( - new JSONParseSpec( + new TimeAndDimsParseSpec( new TimestampSpec(TIME_COLUMN, "auto", null), new DimensionsSpec( DimensionsSpec.getDefaultSchemas(ImmutableList.of(DIM_NAME)), ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME), - ImmutableList.of() - ), - null, - null + ImmutableList.of() + ) ) ); @@ -533,6 +539,42 @@ public class IngestSegmentFirehoseFactoryTest Assert.assertEquals((int) MAX_SHARD_NUMBER * MAX_ROWS, (int) rowcount); } + @Test + public void testTransformSpec() throws IOException + { + Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); + Integer rowcount = 0; + final TransformSpec transformSpec = new TransformSpec( + new SelectorDimFilter(Column.TIME_COLUMN_NAME, "1", null), + ImmutableList.of( + new ExpressionTransform(METRIC_FLOAT_NAME, METRIC_FLOAT_NAME + " * 10", ExprMacroTable.nil()) + ) + ); + int skipped = 0; + try (final IngestSegmentFirehose firehose = + (IngestSegmentFirehose) + factory.connect(transformSpec.decorate(rowParser), null)) { + while (firehose.hasMore()) { + InputRow row = firehose.nextRow(); + if (row == null) { + skipped++; + continue; + } + Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray()); + Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray()); + Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getMetric(METRIC_LONG_NAME).longValue()); + Assert.assertEquals( + METRIC_FLOAT_VALUE * 10, + row.getMetric(METRIC_FLOAT_NAME).floatValue(), + METRIC_FLOAT_VALUE * 0.0001 + ); + ++rowcount; + } + } + Assert.assertEquals(90, skipped); + Assert.assertEquals((int) MAX_ROWS, (int) rowcount); + } + private static ServiceEmitter newMockEmitter() { return new NoopServiceEmitter(); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 822301459e3..58d21ee3aa8 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -59,6 +59,7 @@ import io.druid.segment.IndexSpec; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; +import io.druid.segment.indexing.TransformSpec; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; @@ -91,16 +92,20 @@ public class IngestSegmentFirehoseFactoryTimelineTest private static final String TIME_COLUMN = "t"; private static final String[] DIMENSIONS = new String[]{"d1"}; private static final String[] METRICS = new String[]{"m1"}; - private static final InputRowParser> ROW_PARSER = new MapInputRowParser( - new JSONParseSpec( - new TimestampSpec(TIME_COLUMN, "auto", null), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)), + + // Must decorate the parser, since IngestSegmentFirehoseFactory will undecorate it. + private static final InputRowParser> ROW_PARSER = TransformSpec.NONE.decorate( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec(TIME_COLUMN, "auto", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)), + null, + null + ), null, null - ), - null, - null + ) ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 3e5110d4b94..a15306e2398 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -129,6 +129,7 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.net.URI; @@ -257,6 +258,7 @@ public class TaskLifecycleTest return true; } + @Nullable @Override public InputRow nextRow() { @@ -311,6 +313,7 @@ public class TaskLifecycleTest return inputRowIterator.hasNext(); } + @Nullable @Override public InputRow nextRow() { @@ -661,6 +664,7 @@ public class TaskLifecycleTest null, ImmutableList.of(Intervals.of("2010-01-01/P2D")) ), + null, mapper ), new IndexIOConfig(new MockFirehoseFactory(false), false), @@ -718,6 +722,7 @@ public class TaskLifecycleTest null, ImmutableList.of(Intervals.of("2010-01-01/P1D")) ), + null, mapper ), new IndexIOConfig(new MockExceptionalFirehoseFactory(), false), @@ -1082,6 +1087,7 @@ public class TaskLifecycleTest null, ImmutableList.of(Intervals.of("2010-01-01/P2D")) ), + null, mapper ), new IndexIOConfig(new MockFirehoseFactory(false), false), @@ -1183,6 +1189,7 @@ public class TaskLifecycleTest null, new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")}, new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + null, mapper ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index 1e147f1517f..fcd01e7f642 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -58,7 +58,7 @@ public class TaskAnnouncementTest "theid", new TaskResource("rofl", 2), new FireDepartment( - new DataSchema("foo", null, new AggregatorFactory[0], null, new DefaultObjectMapper()), + new DataSchema("foo", null, new AggregatorFactory[0], null, null, new DefaultObjectMapper()), new RealtimeIOConfig( new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() { diff --git a/server/src/main/java/io/druid/segment/indexing/DataSchema.java b/server/src/main/java/io/druid/segment/indexing/DataSchema.java index bc4c196612d..5dd5815c78c 100644 --- a/server/src/main/java/io/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/io/druid/segment/indexing/DataSchema.java @@ -50,6 +50,7 @@ public class DataSchema private final Map parser; private final AggregatorFactory[] aggregators; private final GranularitySpec granularitySpec; + private final TransformSpec transformSpec; private final ObjectMapper jsonMapper; @@ -61,12 +62,14 @@ public class DataSchema @JsonProperty("parser") Map parser, @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("transformSpec") TransformSpec transformSpec, @JacksonInject ObjectMapper jsonMapper ) { this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMapper."); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource cannot be null. Please provide a dataSource."); this.parser = parser; + this.transformSpec = transformSpec == null ? TransformSpec.NONE : transformSpec; if (granularitySpec == null) { log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default."); @@ -114,7 +117,9 @@ public class DataSchema return cachedParser; } - final InputRowParser inputRowParser = jsonMapper.convertValue(this.parser, InputRowParser.class); + final InputRowParser inputRowParser = transformSpec.decorate( + jsonMapper.convertValue(this.parser, InputRowParser.class) + ); final Set dimensionExclusions = Sets.newHashSet(); for (AggregatorFactory aggregator : aggregators) { @@ -149,12 +154,12 @@ public class DataSchema cachedParser = inputRowParser.withParseSpec( inputRowParser.getParseSpec() - .withDimensionsSpec( - dimensionsSpec - .withDimensionExclusions( - Sets.difference(dimensionExclusions, dimSet) + .withDimensionsSpec( + dimensionsSpec + .withDimensionExclusions( + Sets.difference(dimensionExclusions, dimSet) + ) ) - ) ); } else { cachedParser = inputRowParser; @@ -179,9 +184,20 @@ public class DataSchema return granularitySpec; } + @JsonProperty + public TransformSpec getTransformSpec() + { + return transformSpec; + } + public DataSchema withGranularitySpec(GranularitySpec granularitySpec) { - return new DataSchema(dataSource, parser, aggregators, granularitySpec, jsonMapper); + return new DataSchema(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper); + } + + public DataSchema withTransformSpec(TransformSpec transformSpec) + { + return new DataSchema(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper); } @Override @@ -192,6 +208,7 @@ public class DataSchema ", parser=" + parser + ", aggregators=" + Arrays.toString(aggregators) + ", granularitySpec=" + granularitySpec + + ", transformSpec=" + transformSpec + '}'; } } diff --git a/server/src/main/java/io/druid/segment/indexing/ExpressionTransform.java b/server/src/main/java/io/druid/segment/indexing/ExpressionTransform.java new file mode 100644 index 00000000000..7369a2fce8c --- /dev/null +++ b/server/src/main/java/io/druid/segment/indexing/ExpressionTransform.java @@ -0,0 +1,125 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.indexing; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.data.input.Row; +import io.druid.math.expr.Expr; +import io.druid.math.expr.ExprMacroTable; +import io.druid.math.expr.Parser; +import io.druid.segment.column.Column; + +import java.util.Objects; + +public class ExpressionTransform implements Transform +{ + private final String name; + private final String expression; + private final ExprMacroTable macroTable; + + @JsonCreator + public ExpressionTransform( + @JsonProperty("name") final String name, + @JsonProperty("expression") final String expression, + @JacksonInject ExprMacroTable macroTable + ) + { + this.name = Preconditions.checkNotNull(name, "name"); + this.expression = Preconditions.checkNotNull(expression, "expression"); + this.macroTable = macroTable; + } + + @JsonProperty + @Override + public String getName() + { + return name; + } + + @JsonProperty + public String getExpression() + { + return expression; + } + + @Override + public RowFunction getRowFunction() + { + final Expr expr = Parser.parse(expression, Preconditions.checkNotNull(this.macroTable, "macroTable")); + return new ExpressionRowFunction(expr); + } + + static class ExpressionRowFunction implements RowFunction + { + private final Expr expr; + + ExpressionRowFunction(final Expr expr) + { + this.expr = expr; + } + + @Override + public Object eval(final Row row) + { + return expr.eval(name -> getValueFromRow(row, name)).value(); + } + } + + private static Object getValueFromRow(final Row row, final String column) + { + if (column.equals(Column.TIME_COLUMN_NAME)) { + return row.getTimestampFromEpoch(); + } else { + return row.getRaw(column); + } + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ExpressionTransform that = (ExpressionTransform) o; + return Objects.equals(name, that.name) && + Objects.equals(expression, that.expression); + } + + @Override + public int hashCode() + { + return Objects.hash(name, expression); + } + + @Override + public String toString() + { + return "ExpressionTransform{" + + "name='" + name + '\'' + + ", expression='" + expression + '\'' + + '}'; + } +} diff --git a/server/src/main/java/io/druid/segment/indexing/RowFunction.java b/server/src/main/java/io/druid/segment/indexing/RowFunction.java new file mode 100644 index 00000000000..c9dabd744c5 --- /dev/null +++ b/server/src/main/java/io/druid/segment/indexing/RowFunction.java @@ -0,0 +1,30 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.indexing; + +import io.druid.data.input.Row; + +/** + * Interface for evaluating functions on rows. Used by {@link Transformer}. + */ +public interface RowFunction +{ + Object eval(Row row); +} diff --git a/server/src/main/java/io/druid/segment/indexing/Transform.java b/server/src/main/java/io/druid/segment/indexing/Transform.java new file mode 100644 index 00000000000..9c8253f066b --- /dev/null +++ b/server/src/main/java/io/druid/segment/indexing/Transform.java @@ -0,0 +1,53 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.indexing; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * A row transform that is part of a {@link TransformSpec}. Transforms allow adding new fields to input rows. Each + * one has a "name" (the name of the new field) which can be referred to by DimensionSpecs, AggregatorFactories, etc. + * Each also has a "row function", which produces values for this new field based on looking at the entire input row. + * + * If a transform has the same name as a field in an input row, then it will shadow the original field. Transforms + * that shadow fields may still refer to the fields they shadow. This can be used to transform a field "in-place". + * + * Transforms do have some limitations. They can only refer to fields present in the actual input rows; in particular, + * they cannot refer to other transforms. And they cannot remove fields, only add them. However, they can shadow a + * field with another field containing all nulls, which will act similarly to removing the field. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "expression", value = ExpressionTransform.class) +}) +public interface Transform +{ + /** + * Returns the field name for this transform. + */ + String getName(); + + /** + * Returns the function for this transform. The RowFunction takes an entire row as input and returns a column value + * as output. + */ + RowFunction getRowFunction(); +} diff --git a/server/src/main/java/io/druid/segment/indexing/TransformSpec.java b/server/src/main/java/io/druid/segment/indexing/TransformSpec.java new file mode 100644 index 00000000000..ff441834a07 --- /dev/null +++ b/server/src/main/java/io/druid/segment/indexing/TransformSpec.java @@ -0,0 +1,147 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.indexing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.java.util.common.ISE; +import io.druid.query.filter.DimFilter; + +import javax.annotation.Nullable; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Specifies how input rows should be filtered and transforms. There are two parts: a "filter" (which can filter out + * input rows) and "transforms" (which can add fields to input rows). Filters may refer to fields generated by + * a transform. + * + * See {@link Transform} for details on how each transform works. + */ +public class TransformSpec +{ + public static final TransformSpec NONE = new TransformSpec(null, null); + + private final DimFilter filter; + private final List transforms; + + @JsonCreator + public TransformSpec( + @JsonProperty("filter") final DimFilter filter, + @JsonProperty("transforms") final List transforms + ) + { + this.filter = filter; + this.transforms = transforms == null ? ImmutableList.of() : transforms; + + // Check for name collisions. + final Set seen = new HashSet<>(); + for (Transform transform : this.transforms) { + if (!seen.add(transform.getName())) { + throw new ISE("Transform name '%s' cannot be used twice", transform.getName()); + } + } + } + + public static TransformSpec fromInputRowParser(final InputRowParser parser) + { + // Hack: some firehoses and input specs must extract transformSpec from the parser, since they do not + // actually use the parser, but still must respect the transformSpec. This method should extract whatever + // transformSpec "decorate" had put in. + + if (parser instanceof TransformingInputRowParser) { + return ((TransformingInputRowParser) parser).getTransformSpec(); + } else if (parser instanceof TransformingStringInputRowParser) { + return ((TransformingStringInputRowParser) parser).getTransformSpec(); + } else { + throw new ISE("Parser was not decorated, but should have been"); + } + } + + @JsonProperty + @Nullable + public DimFilter getFilter() + { + return filter; + } + + @JsonProperty + public List getTransforms() + { + return transforms; + } + + public InputRowParser decorate(final InputRowParser parser) + { + // Always decorates, even if the transformSpec is a no-op. This is so fromInputRowParser can insist that the + // parser is a transforming parser, and possibly help detect coding errors where someone forgot to call "decorate". + + if (parser instanceof StringInputRowParser) { + // Hack to support the fact that some callers use special methods in StringInputRowParser, such as + // parse(String) and startFileFromBeginning. + return (InputRowParser) new TransformingStringInputRowParser( + parser.getParseSpec(), + ((StringInputRowParser) parser).getEncoding(), + this + ); + } else { + return new TransformingInputRowParser<>(parser, this); + } + } + + public Transformer toTransformer() + { + return new Transformer(this); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TransformSpec that = (TransformSpec) o; + return Objects.equals(filter, that.filter) && + Objects.equals(transforms, that.transforms); + } + + @Override + public int hashCode() + { + return Objects.hash(filter, transforms); + } + + @Override + public String toString() + { + return "TransformSpec{" + + "filter=" + filter + + ", transforms=" + transforms + + '}'; + } +} diff --git a/server/src/main/java/io/druid/segment/indexing/Transformer.java b/server/src/main/java/io/druid/segment/indexing/Transformer.java new file mode 100644 index 00000000000..8a18f898c43 --- /dev/null +++ b/server/src/main/java/io/druid/segment/indexing/Transformer.java @@ -0,0 +1,193 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.indexing; + +import io.druid.data.input.InputRow; +import io.druid.data.input.Row; +import io.druid.data.input.Rows; +import io.druid.java.util.common.DateTimes; +import io.druid.query.filter.ValueMatcher; +import io.druid.query.groupby.RowBasedColumnSelectorFactory; +import io.druid.segment.column.Column; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * + */ +public class Transformer +{ + private final Map transforms = new HashMap<>(); + private final ThreadLocal rowSupplierForValueMatcher = new ThreadLocal<>(); + private final ValueMatcher valueMatcher; + + Transformer(final TransformSpec transformSpec) + { + for (final Transform transform : transformSpec.getTransforms()) { + transforms.put(transform.getName(), transform.getRowFunction()); + } + + if (transformSpec.getFilter() != null) { + valueMatcher = transformSpec.getFilter().toFilter() + .makeMatcher( + RowBasedColumnSelectorFactory.create( + rowSupplierForValueMatcher, + null + ) + ); + } else { + valueMatcher = null; + } + } + + /** + * Transforms an input row, or returns null if the row should be filtered out. + * + * @param row the input row + */ + @Nullable + public InputRow transform(@Nullable final InputRow row) + { + if (row == null) { + return null; + } + + final InputRow transformedRow; + + if (transforms.isEmpty()) { + transformedRow = row; + } else { + transformedRow = new TransformedInputRow(row, transforms); + } + + if (valueMatcher != null) { + rowSupplierForValueMatcher.set(transformedRow); + if (!valueMatcher.matches()) { + return null; + } + } + + return transformedRow; + } + + public static class TransformedInputRow implements InputRow + { + private final InputRow row; + private final Map transforms; + + public TransformedInputRow(final InputRow row, final Map transforms) + { + this.row = row; + this.transforms = transforms; + } + + @Override + public List getDimensions() + { + return row.getDimensions(); + } + + @Override + public long getTimestampFromEpoch() + { + final RowFunction transform = transforms.get(Column.TIME_COLUMN_NAME); + if (transform != null) { + return Rows.objectToNumber(Column.TIME_COLUMN_NAME, transform.eval(row)).longValue(); + } else { + return row.getTimestampFromEpoch(); + } + } + + @Override + public DateTime getTimestamp() + { + final RowFunction transform = transforms.get(Column.TIME_COLUMN_NAME); + if (transform != null) { + return DateTimes.utc(getTimestampFromEpoch()); + } else { + return row.getTimestamp(); + } + } + + @Override + public List getDimension(final String dimension) + { + final RowFunction transform = transforms.get(dimension); + if (transform != null) { + return Rows.objectToStrings(transform.eval(row)); + } else { + return row.getDimension(dimension); + } + } + + @Override + public Object getRaw(final String column) + { + final RowFunction transform = transforms.get(column); + if (transform != null) { + return transform.eval(row); + } else { + return row.getRaw(column); + } + } + + @Override + public Number getMetric(final String metric) + { + final RowFunction transform = transforms.get(metric); + if (transform != null) { + return Rows.objectToNumber(metric, transform.eval(row)); + } else { + return row.getMetric(metric); + } + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TransformedInputRow that = (TransformedInputRow) o; + return Objects.equals(row, that.row) && + Objects.equals(transforms, that.transforms); + } + + @Override + public int hashCode() + { + return Objects.hash(row, transforms); + } + + @Override + public int compareTo(final Row o) + { + return row.compareTo(o); + } + } +} diff --git a/server/src/main/java/io/druid/segment/indexing/TransformingInputRowParser.java b/server/src/main/java/io/druid/segment/indexing/TransformingInputRowParser.java new file mode 100644 index 00000000000..96151b585f6 --- /dev/null +++ b/server/src/main/java/io/druid/segment/indexing/TransformingInputRowParser.java @@ -0,0 +1,62 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.indexing; + +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.ParseSpec; + +public class TransformingInputRowParser implements InputRowParser +{ + private final InputRowParser parser; + private final TransformSpec transformSpec; + private final Transformer transformer; + + public TransformingInputRowParser(final InputRowParser parser, final TransformSpec transformSpec) + { + this.parser = parser; + this.transformSpec = transformSpec; + this.transformer = transformSpec.toTransformer(); + } + + @Override + public InputRow parse(final T row) + { + return transformer.transform(parser.parse(row)); + } + + @Override + public ParseSpec getParseSpec() + { + return parser.getParseSpec(); + } + + @Override + @SuppressWarnings("unchecked") + public InputRowParser withParseSpec(final ParseSpec parseSpec) + { + return new TransformingInputRowParser<>(parser.withParseSpec(parseSpec), transformSpec); + } + + public TransformSpec getTransformSpec() + { + return transformSpec; + } +} diff --git a/server/src/main/java/io/druid/segment/indexing/TransformingStringInputRowParser.java b/server/src/main/java/io/druid/segment/indexing/TransformingStringInputRowParser.java new file mode 100644 index 00000000000..8a241969136 --- /dev/null +++ b/server/src/main/java/io/druid/segment/indexing/TransformingStringInputRowParser.java @@ -0,0 +1,68 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.indexing; + +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.StringInputRowParser; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class TransformingStringInputRowParser extends StringInputRowParser +{ + private final TransformSpec transformSpec; + private final Transformer transformer; + + public TransformingStringInputRowParser( + final ParseSpec parseSpec, + final String encoding, + final TransformSpec transformSpec + ) + { + super(parseSpec, encoding); + this.transformSpec = transformSpec; + this.transformer = transformSpec.toTransformer(); + } + + @Override + public InputRow parse(final ByteBuffer input) + { + return transformer.transform(super.parse(input)); + } + + @Nullable + @Override + public InputRow parse(@Nullable final String input) + { + return transformer.transform(super.parse(input)); + } + + @Override + public StringInputRowParser withParseSpec(final ParseSpec parseSpec) + { + return new TransformingStringInputRowParser(parseSpec, getEncoding(), transformSpec); + } + + public TransformSpec getTransformSpec() + { + return transformSpec; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java index 4dc3c962752..4267f6409a2 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java @@ -29,6 +29,7 @@ import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Iterator; @@ -110,6 +111,7 @@ public class CombiningFirehoseFactory implements FirehoseFactory return currentFirehose.hasMore(); } + @Nullable @Override public InputRow nextRow() { diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index 70b879360a9..59044ae2c9a 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -37,7 +37,7 @@ import io.druid.java.util.common.concurrent.Execs; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.InputRowParser; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.DateTimes; @@ -52,6 +52,7 @@ import io.druid.server.security.ResourceAction; import io.druid.server.security.ResourceType; import org.joda.time.DateTime; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.POST; @@ -81,7 +82,7 @@ import java.util.concurrent.atomic.AtomicLong; * Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these * firehoses with an {@link ServiceAnnouncingChatHandlerProvider}. */ -public class EventReceiverFirehoseFactory implements FirehoseFactory +public class EventReceiverFirehoseFactory implements FirehoseFactory>> { public static final int MAX_FIREHOSE_PRODUCERS = 10_000; @@ -119,7 +120,10 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory> firehoseParser, + File temporaryDirectory + ) throws IOException { log.info("Connecting firehose: %s", serviceName); final EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser); @@ -155,7 +159,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory buffer; - private final MapInputRowParser parser; + private final InputRowParser> parser; private final Object readLock = new Object(); @@ -165,7 +169,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory producerSequences = new ConcurrentHashMap<>(); - public EventReceiverFirehose(MapInputRowParser parser) + public EventReceiverFirehose(InputRowParser> parser) { this.buffer = new ArrayBlockingQueue<>(bufferSize); this.parser = parser; @@ -185,7 +189,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory rowYielder; public IngestSegmentFirehose( final List adapters, + final TransformSpec transformSpec, final List dims, final List metrics, final DimFilter dimFilter ) { + this.transformer = transformSpec.toTransformer(); + Sequence rows = Sequences.concat( Iterables.transform( adapters, new Function>() @@ -184,12 +190,13 @@ public class IngestSegmentFirehose implements Firehose return !rowYielder.isDone(); } + @Nullable @Override public InputRow nextRow() { final InputRow inputRow = rowYielder.get(); rowYielder = rowYielder.next(null); - return inputRow; + return transformer.transform(inputRow); } @Override diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java index d1fd632672c..456790533d0 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java @@ -33,11 +33,13 @@ import com.ircclouds.irc.api.state.IIRCState; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Pair; import io.druid.java.util.common.logger.Logger; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; @@ -61,7 +63,7 @@ import java.util.concurrent.TimeUnit; * ); * } */ -public class IrcFirehoseFactory implements FirehoseFactory +public class IrcFirehoseFactory implements FirehoseFactory>> { private static final Logger log = new Logger(IrcFirehoseFactory.class); @@ -101,7 +103,10 @@ public class IrcFirehoseFactory implements FirehoseFactory } @Override - public Firehose connect(final IrcInputRowParser firehoseParser, File temporaryDirectory) throws IOException + public Firehose connect( + final InputRowParser> firehoseParser, + final File temporaryDirectory + ) throws IOException { final IRCApi irc = new IRCApiImpl(false); final LinkedBlockingQueue> queue = new LinkedBlockingQueue>(); @@ -212,6 +217,7 @@ public class IrcFirehoseFactory implements FirehoseFactory } } + @Nullable @Override public InputRow nextRow() { diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/PredicateFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/PredicateFirehose.java index 1310d993c35..7b216b4ced4 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/PredicateFirehose.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/PredicateFirehose.java @@ -25,6 +25,7 @@ import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.java.util.common.logger.Logger; +import javax.annotation.Nullable; import java.io.IOException; /** @@ -71,6 +72,7 @@ public class PredicateFirehose implements Firehose return false; } + @Nullable @Override public InputRow nextRow() { diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index 3fbd9ac8ae9..68a19c4482e 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -29,6 +29,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; @@ -102,6 +103,7 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory parserMap = jsonMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("time", "auto", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "dimA", "dimB", "col2")), + ImmutableList.of(), + null + ), + null, + null + ), + null + ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + DataSchema schema = new DataSchema( + "test", + parserMap, + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("metric1", "col1"), + new DoubleSumAggregatorFactory("metric2", "col2"), + }, + new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))), + new TransformSpec( + new SelectorDimFilter("dimA", "foo", null), + ImmutableList.of( + new ExpressionTransform("expr", "concat(dimA,dimA)", TestExprMacroTable.INSTANCE) + ) + ), + jsonMapper + ); + + // Test hack that produces a StringInputRowParser. + final StringInputRowParser parser = (StringInputRowParser) schema.getParser(); + + final InputRow row1bb = parser.parse( + ByteBuffer.wrap("{\"time\":\"2000-01-01\",\"dimA\":\"foo\"}".getBytes(Charsets.UTF_8)) + ); + Assert.assertEquals(DateTimes.of("2000-01-01"), row1bb.getTimestamp()); + Assert.assertEquals("foo", row1bb.getRaw("dimA")); + Assert.assertEquals("foofoo", row1bb.getRaw("expr")); + + final InputRow row1string = parser.parse("{\"time\":\"2000-01-01\",\"dimA\":\"foo\"}"); + Assert.assertEquals(DateTimes.of("2000-01-01"), row1string.getTimestamp()); + Assert.assertEquals("foo", row1string.getRaw("dimA")); + Assert.assertEquals("foofoo", row1string.getRaw("expr")); + + final InputRow row2 = parser.parse( + ByteBuffer.wrap("{\"time\":\"2000-01-01\",\"dimA\":\"x\"}".getBytes(Charsets.UTF_8)) + ); + Assert.assertNull(row2); + } + @Test(expected = IAE.class) public void testOverlapMetricNameAndDim() throws Exception { @@ -148,6 +213,7 @@ public class DataSchemaTest new DoubleSumAggregatorFactory("metric2", "col2"), }, new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))), + null, jsonMapper ); schema.getParser(); @@ -181,6 +247,7 @@ public class DataSchemaTest new DoubleSumAggregatorFactory("metric1", "col3"), }, new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))), + null, jsonMapper ); schema.getParser(); @@ -255,7 +322,7 @@ public class DataSchemaTest null ) ); - Assert.assertEquals( + Assert.assertArrayEquals( actual.getAggregators(), new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1") diff --git a/server/src/test/java/io/druid/segment/indexing/TransformSpecTest.java b/server/src/test/java/io/druid/segment/indexing/TransformSpecTest.java new file mode 100644 index 00000000000..72bd851ae97 --- /dev/null +++ b/server/src/test/java/io/druid/segment/indexing/TransformSpecTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.TimeAndDimsParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.java.util.common.DateTimes; +import io.druid.query.expression.TestExprMacroTable; +import io.druid.query.filter.AndDimFilter; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class TransformSpecTest +{ + private static final MapInputRowParser PARSER = new MapInputRowParser( + new TimeAndDimsParseSpec( + new TimestampSpec("t", "auto", DateTimes.of("2000-01-01")), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("f", "x", "y")), + null, + null + ) + ) + ); + + private static final Map ROW1 = ImmutableMap.builder() + .put("x", "foo") + .put("y", "bar") + .put("a", 2.0) + .put("b", 3L) + .build(); + + private static final Map ROW2 = ImmutableMap.builder() + .put("x", "foo") + .put("y", "baz") + .put("a", 2.0) + .put("b", 4L) + .build(); + + @Test + public void testTransforms() + { + final TransformSpec transformSpec = new TransformSpec( + null, + ImmutableList.of( + new ExpressionTransform("f", "concat(x,y)", TestExprMacroTable.INSTANCE), + new ExpressionTransform("g", "a + b", TestExprMacroTable.INSTANCE), + new ExpressionTransform("h", "concat(f,g)", TestExprMacroTable.INSTANCE) + ) + ); + + final InputRowParser> parser = transformSpec.decorate(PARSER); + final InputRow row = parser.parse(ROW1); + + Assert.assertNotNull(row); + Assert.assertEquals(DateTimes.of("2000-01-01").getMillis(), row.getTimestampFromEpoch()); + Assert.assertEquals(DateTimes.of("2000-01-01"), row.getTimestamp()); + Assert.assertEquals(ImmutableList.of("f", "x", "y"), row.getDimensions()); + Assert.assertEquals(ImmutableList.of("foo"), row.getDimension("x")); + Assert.assertEquals(3.0, row.getMetric("b").doubleValue(), 0); + Assert.assertEquals("foobar", row.getRaw("f")); + Assert.assertEquals(ImmutableList.of("foobar"), row.getDimension("f")); + Assert.assertEquals(ImmutableList.of("5.0"), row.getDimension("g")); + Assert.assertEquals(ImmutableList.of(), row.getDimension("h")); + Assert.assertEquals(5L, row.getMetric("g").longValue()); + } + + @Test + public void testTransformOverwriteField() + { + // Transforms are allowed to overwrite fields, and to refer to the fields they overwrite; double-check this. + + final TransformSpec transformSpec = new TransformSpec( + null, + ImmutableList.of( + new ExpressionTransform("x", "concat(x,y)", TestExprMacroTable.INSTANCE) + ) + ); + + final InputRowParser> parser = transformSpec.decorate(PARSER); + final InputRow row = parser.parse(ROW1); + + Assert.assertNotNull(row); + Assert.assertEquals(DateTimes.of("2000-01-01").getMillis(), row.getTimestampFromEpoch()); + Assert.assertEquals(DateTimes.of("2000-01-01"), row.getTimestamp()); + Assert.assertEquals(ImmutableList.of("f", "x", "y"), row.getDimensions()); + Assert.assertEquals(ImmutableList.of("foobar"), row.getDimension("x")); + Assert.assertEquals(3.0, row.getMetric("b").doubleValue(), 0); + Assert.assertNull(row.getRaw("f")); + } + + @Test + public void testFilterOnTransforms() + { + // Filters are allowed to refer to transformed fields; double-check this. + + final TransformSpec transformSpec = new TransformSpec( + new AndDimFilter( + ImmutableList.of( + new SelectorDimFilter("x", "foo", null), + new SelectorDimFilter("f", "foobar", null), + new SelectorDimFilter("g", "5.0", null) + ) + ), + ImmutableList.of( + new ExpressionTransform("f", "concat(x,y)", TestExprMacroTable.INSTANCE), + new ExpressionTransform("g", "a + b", TestExprMacroTable.INSTANCE) + ) + ); + + final InputRowParser> parser = transformSpec.decorate(PARSER); + Assert.assertNotNull(parser.parse(ROW1)); + Assert.assertNull(parser.parse(ROW2)); + } + + @Test + public void testTransformTimeFromOtherFields() + { + final TransformSpec transformSpec = new TransformSpec( + null, + ImmutableList.of( + new ExpressionTransform("__time", "(a + b) * 3600000", TestExprMacroTable.INSTANCE) + ) + ); + + final InputRowParser> parser = transformSpec.decorate(PARSER); + final InputRow row = parser.parse(ROW1); + + Assert.assertNotNull(row); + Assert.assertEquals(DateTimes.of("1970-01-01T05:00:00Z"), row.getTimestamp()); + Assert.assertEquals(DateTimes.of("1970-01-01T05:00:00Z").getMillis(), row.getTimestampFromEpoch()); + } + + @Test + public void testTransformTimeFromTime() + { + final TransformSpec transformSpec = new TransformSpec( + null, + ImmutableList.of( + new ExpressionTransform("__time", "__time + 3600000", TestExprMacroTable.INSTANCE) + ) + ); + + final InputRowParser> parser = transformSpec.decorate(PARSER); + final InputRow row = parser.parse(ROW1); + + Assert.assertNotNull(row); + Assert.assertEquals(DateTimes.of("2000-01-01T01:00:00Z"), row.getTimestamp()); + Assert.assertEquals(DateTimes.of("2000-01-01T01:00:00Z").getMillis(), row.getTimestampFromEpoch()); + } + + @Test + public void testSerde() throws Exception + { + final TransformSpec transformSpec = new TransformSpec( + new AndDimFilter( + ImmutableList.of( + new SelectorDimFilter("x", "foo", null), + new SelectorDimFilter("f", "foobar", null), + new SelectorDimFilter("g", "5.0", null) + ) + ), + ImmutableList.of( + new ExpressionTransform("f", "concat(x,y)", TestExprMacroTable.INSTANCE), + new ExpressionTransform("g", "a + b", TestExprMacroTable.INSTANCE) + ) + ); + + final ObjectMapper jsonMapper = TestHelper.getJsonMapper(); + Assert.assertEquals( + transformSpec, + jsonMapper.readValue(jsonMapper.writeValueAsString(transformSpec), TransformSpec.class) + ); + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 91eb6fd5898..137b75b451f 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -97,6 +97,7 @@ public class FireDepartmentTest new CountAggregatorFactory("count") }, new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, null), + null, jsonMapper ), new RealtimeIOConfig( diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index cf4f9e6e90a..e82d7146d4a 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -145,6 +145,7 @@ public class RealtimeManagerTest null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), + null, jsonMapper ); schema2 = new DataSchema( @@ -152,6 +153,7 @@ public class RealtimeManagerTest null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), + null, jsonMapper ); RealtimeIOConfig ioConfig = new RealtimeIOConfig( @@ -295,6 +297,7 @@ public class RealtimeManagerTest null, new AggregatorFactory[]{new CountAggregatorFactory("ignore")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), + null, jsonMapper ); @@ -331,8 +334,8 @@ public class RealtimeManagerTest } Assert.assertEquals(1, realtimeManager.getMetrics("test").processed()); - Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway()); - Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable()); + Assert.assertEquals(2, realtimeManager.getMetrics("test").thrownAway()); + Assert.assertEquals(1, realtimeManager.getMetrics("test").unparseable()); Assert.assertTrue(plumber.isStartedJob()); Assert.assertTrue(plumber.isFinishedJob()); Assert.assertEquals(0, plumber.getPersistCount()); @@ -859,6 +862,7 @@ public class RealtimeManagerTest return rows.hasNext(); } + @Nullable @Override public InputRow nextRow() { diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 1bf80a4f18e..766fa19709b 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -21,20 +21,17 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; -//CHECKSTYLE.OFF: Regexp -import com.metamx.common.logger.Logger; -//CHECKSTYLE.ON: Regexp import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.core.LoggingEmitter; +import com.metamx.emitter.core.NoopEmitter; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; -import io.druid.java.util.common.concurrent.Execs; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.IntervalChunkingQueryRunnerDecorator; @@ -130,6 +127,7 @@ public class AppenderatorTester implements AutoCloseable new LongSumAggregatorFactory("met", "met") }, new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null), + null, objectMapper ); @@ -170,11 +168,7 @@ public class AppenderatorTester implements AutoCloseable emitter = new ServiceEmitter( "test", "test", - new LoggingEmitter( - new Logger(AppenderatorTester.class), - LoggingEmitter.Level.INFO, - objectMapper - ) + new NoopEmitter() ); emitter.start(); EmittingLogger.registerEmitter(emitter); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index 52a5aa9a431..39bc73eaa57 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -128,6 +128,7 @@ public class DefaultOfflineAppenderatorFactoryTest new LongSumAggregatorFactory("met", "met") }, new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null), + null, objectMapper ); diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index 0d983788f39..67ca7508d7e 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -47,6 +47,7 @@ import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; +import io.druid.segment.indexing.TransformSpec; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -117,6 +118,7 @@ public class IngestSegmentFirehoseTest final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval()); final IngestSegmentFirehose firehose = new IngestSegmentFirehose( ImmutableList.of(wsa, wsa), + TransformSpec.NONE, ImmutableList.of("host", "spatial"), ImmutableList.of("visited_sum", "unique_hosts"), null @@ -149,6 +151,7 @@ public class IngestSegmentFirehoseTest // Do a spatial filter final IngestSegmentFirehose firehose2 = new IngestSegmentFirehose( ImmutableList.of(new WindowedStorageAdapter(queryable, Intervals.of("2000/3000"))), + TransformSpec.NONE, ImmutableList.of("host", "spatial"), ImmutableList.of("visited_sum", "unique_hosts"), new SpatialDimFilter("spatial", new RadiusBound(new float[]{1, 0}, 0.1f)) diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index a0015353b01..f9ddac4413f 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -142,6 +142,7 @@ public class RealtimePlumberSchoolTest ), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), + null, jsonMapper ); @@ -161,6 +162,7 @@ public class RealtimePlumberSchoolTest ), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.YEAR, Granularities.NONE, null), + null, jsonMapper ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 291a5e8aa4b..08eb656c3f3 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -53,6 +53,7 @@ public class SinkTest null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, null), + null, new DefaultObjectMapper() ); diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index 4c55781634b..06ae9b7c5d0 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -150,6 +150,7 @@ public class DruidJsonValidatorTest null, new AggregatorFactory[0], new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), + null, jsonMapper ), new RealtimeIOConfig(