Introduce "transformSpec" at ingest-time. (#4890)

* Introduce "transformSpec" at ingest-time.

It accepts a "filter" (standard query filter object) and "transforms" (a
list of objects with "name" and "expression"). These can be used to do
filtering and single-row transforms without need for a separate data
processing job.

The "expression" fields use the same expression language as other
expression-based feature.

* Remove forbidden api.

* Fix compile error.

* Fix tests.

* Some more changes.

- Add nullable annotation to Firehose.nextRow.
- Add tests for index task, realtime task, kafka task, hadoop mapper,
  and ingestSegment firehose.

* Fix bad merge.

* Adjust imports.

* Adjust whitespace.

* Make Transform into an interface.

* Add missing annotation.

* Switch logger.

* Switch logger.

* Adjust test.

* Adjustment to handling for DatasourceIngestionSpec.

* Fix test.

* CR comments.

* Remove unused method.

* Add javadocs.

* More javadocs, and always decorate.

* Fix bug in TransformingStringInputRowParser.

* Fix bad merge.

* Fix ISFF tests.

* Fix DORC test.
This commit is contained in:
Gian Merlino 2017-10-30 17:38:52 -07:00 committed by GitHub
parent 1df458b35e
commit 0ce406bdf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
74 changed files with 2011 additions and 455 deletions

View File

@ -21,15 +21,10 @@ package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import io.druid.guice.annotations.PublicApi;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -38,8 +33,6 @@ import java.util.Map;
@PublicApi
public class MapBasedRow implements Row
{
private static final Long LONG_ZERO = 0L;
private final DateTime timestamp;
private final Map<String, Object> event;
@ -83,16 +76,7 @@ public class MapBasedRow implements Row
@Override
public List<String> getDimension(String dimension)
{
final Object dimValue = event.get(dimension);
if (dimValue == null) {
return Collections.emptyList();
} else if (dimValue instanceof List) {
// guava's toString function fails on null objects, so please do not use it
return Lists.transform((List) dimValue, String::valueOf);
} else {
return Collections.singletonList(String.valueOf(dimValue));
}
return Rows.objectToStrings(event.get(dimension));
}
@Override
@ -104,44 +88,7 @@ public class MapBasedRow implements Row
@Override
public Number getMetric(String metric)
{
Object metricValue = event.get(metric);
if (metricValue == null) {
return LONG_ZERO;
}
if (metricValue instanceof Number) {
return (Number) metricValue;
} else if (metricValue instanceof String) {
try {
String metricValueString = StringUtils.removeChar(((String) metricValue).trim(), ',');
// Longs.tryParse() doesn't support leading '+', so we need to trim it ourselves
metricValueString = trimLeadingPlusOfLongString(metricValueString);
Long v = Longs.tryParse(metricValueString);
// Do NOT use ternary operator here, because it makes Java to convert Long to Double
if (v != null) {
return v;
} else {
return Double.valueOf(metricValueString);
}
}
catch (Exception e) {
throw new ParseException(e, "Unable to parse metrics[%s], value[%s]", metric, metricValue);
}
} else {
throw new ParseException("Unknown type[%s]", metricValue.getClass());
}
}
private static String trimLeadingPlusOfLongString(String metricValueString)
{
if (metricValueString.length() > 1 && metricValueString.charAt(0) == '+') {
char secondChar = metricValueString.charAt(1);
if (secondChar >= '0' && secondChar <= '9') {
metricValueString = metricValueString.substring(1);
}
}
return metricValueString;
return Rows.objectToNumber(metric, event.get(metric));
}
@Override

View File

@ -22,7 +22,12 @@ package io.druid.data.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.parsers.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -31,9 +36,12 @@ import java.util.Set;
*/
public class Rows
{
public static final Long LONG_ZERO = 0L;
/**
* @param timeStamp rollup up timestamp to be used to create group key
* @param inputRow input row
* @param inputRow input row
*
* @return groupKey for the given input row
*/
public static List<Object> toGroupKey(long timeStamp, InputRow inputRow)
@ -50,4 +58,77 @@ public class Rows
dims
);
}
/**
* Convert an object to a list of strings.
*/
public static List<String> objectToStrings(final Object inputValue)
{
if (inputValue == null) {
return Collections.emptyList();
} else if (inputValue instanceof List) {
// guava's toString function fails on null objects, so please do not use it
final List<Object> values = (List) inputValue;
final List<String> retVal = new ArrayList<>(values.size());
for (Object val : values) {
retVal.add(String.valueOf(val));
}
return retVal;
} else {
return Collections.singletonList(String.valueOf(inputValue));
}
}
/**
* Convert an object to a number. Nulls are treated as zeroes.
*
* @param name field name of the object being converted (may be used for exception messages)
* @param inputValue the actual object being converted
*
* @return a number
*
* @throws NullPointerException if the string is null
* @throws ParseException if the column cannot be converted to a number
*/
public static Number objectToNumber(final String name, final Object inputValue)
{
if (inputValue == null) {
return Rows.LONG_ZERO;
}
if (inputValue instanceof Number) {
return (Number) inputValue;
} else if (inputValue instanceof String) {
try {
String metricValueString = StringUtils.removeChar(((String) inputValue).trim(), ',');
// Longs.tryParse() doesn't support leading '+', so we need to trim it ourselves
metricValueString = trimLeadingPlusOfLongString(metricValueString);
Long v = Longs.tryParse(metricValueString);
// Do NOT use ternary operator here, because it makes Java to convert Long to Double
if (v != null) {
return v;
} else {
return Double.valueOf(metricValueString);
}
}
catch (Exception e) {
throw new ParseException(e, "Unable to parse value[%s] for field[%s]", inputValue, name);
}
} else {
throw new ParseException("Unknown type[%s] for field", inputValue.getClass(), inputValue);
}
}
private static String trimLeadingPlusOfLongString(String metricValueString)
{
if (metricValueString.length() > 1 && metricValueString.charAt(0) == '+') {
char secondChar = metricValueString.charAt(1);
if (secondChar >= '0' && secondChar <= '9') {
metricValueString = metricValueString.substring(1);
}
}
return metricValueString;
}
}

View File

@ -121,7 +121,8 @@ public class DimensionsSpec
return dimensionExclusions;
}
@Deprecated @JsonIgnore
@Deprecated
@JsonIgnore
public List<SpatialDimensionSchema> getSpatialDimensions()
{
Iterable<NewSpatialDimensionSchema> filteredList = Iterables.filter(
@ -244,4 +245,13 @@ public class DimensionsSpec
result = 31 * result + dimensionExclusions.hashCode();
return result;
}
@Override
public String toString()
{
return "DimensionsSpec{" +
"dimensions=" + dimensions +
", dimensionExclusions=" + dimensionExclusions +
'}';
}
}

View File

@ -24,6 +24,7 @@ import io.druid.data.input.InputRow;
import io.druid.utils.Runnables;
import org.apache.commons.io.LineIterator;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
@ -69,6 +70,7 @@ public class FileIteratingFirehose implements Firehose
return lineIterator != null && lineIterator.hasNext();
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.data.input.InputRow;
import io.druid.guice.annotations.ExtensionPoint;
import javax.annotation.Nullable;
@ExtensionPoint
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes(value = {
@ -33,6 +35,11 @@ import io.druid.guice.annotations.ExtensionPoint;
})
public interface InputRowParser<T>
{
/**
* Parse an input into an {@link InputRow}. Return null if this input should be thrown away, or throws
* {@code ParseException} if the input is unparseable.
*/
@Nullable
InputRow parse(T input);
ParseSpec getParseSpec();

View File

@ -30,6 +30,7 @@ import io.druid.java.util.common.parsers.Parser;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
*/
@ -97,4 +98,38 @@ public class JSONParseSpec extends ParseSpec
{
return featureSpec;
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
final JSONParseSpec that = (JSONParseSpec) o;
return Objects.equals(flattenSpec, that.flattenSpec) &&
Objects.equals(featureSpec, that.featureSpec);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), flattenSpec, featureSpec);
}
@Override
public String toString()
{
return "JSONParseSpec{" +
"timestampSpec=" + getTimestampSpec() +
", dimensionsSpec=" + getDimensionsSpec() +
", flattenSpec=" + flattenSpec +
", featureSpec=" + featureSpec +
'}';
}
}

View File

@ -13,13 +13,16 @@ This expression language supports the following operators (listed in decreasing
|<, <=, >, >=, ==, !=|Binary Comparison|
|&&,\|\||Binary Logical AND, OR|
Long, double and string data types are supported. If a number contains a dot, it is interpreted as a double, otherwise it is interpreted as a long. That means, always add a '.' to your number if you want it interpreted as a double value. String literal should be quoted by single quotation marks.
Long, double, and string data types are supported. If a number contains a dot, it is interpreted as a double, otherwise it is interpreted as a long. That means, always add a '.' to your number if you want it interpreted as a double value. String literals should be quoted by single quotation marks.
Expressions can contain variables. Variable names may contain letters, digits, '\_' and '$'. Variable names must not begin with a digit. To escape other special characters, user can quote it with double quotation marks.
Multi-value types are not fully supported yet. Expressions may behave inconsistently on multi-value types, and you
should not rely on the behavior in this case to stay the same in future releases.
For logical operators, a number is true if and only if it is positive (0 or minus value means false). For string type, it's evaluation result of 'Boolean.valueOf(string)'.
Expressions can contain variables. Variable names may contain letters, digits, '\_' and '$'. Variable names must not begin with a digit. To escape other special characters, you can quote it with double quotation marks.
Also, the following built-in functions are supported.
For logical operators, a number is true if and only if it is positive (0 or negative value means false). For string type, it's the evaluation result of 'Boolean.valueOf(string)'.
The following built-in functions are available.
## General functions

View File

@ -250,6 +250,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
}
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -34,14 +34,15 @@ import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -57,7 +58,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputRowParser>
public class RocketMQFirehoseFactory implements FirehoseFactory<InputRowParser<ByteBuffer>>
{
private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class);
@ -139,7 +140,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
@Override
public Firehose connect(
ByteBufferInputRowParser byteBufferInputRowParser,
InputRowParser<ByteBuffer> byteBufferInputRowParser,
File temporaryDirectory
) throws IOException, ParseException
{
@ -149,7 +150,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
Sets.newHashSet("feed")
);
final ByteBufferInputRowParser theParser = byteBufferInputRowParser.withParseSpec(
final InputRowParser<ByteBuffer> theParser = byteBufferInputRowParser.withParseSpec(
byteBufferInputRowParser.getParseSpec()
.withDimensionsSpec(
byteBufferInputRowParser.getParseSpec()
@ -247,6 +248,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
return hasMore;
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -19,6 +19,7 @@
package io.druid.data.input.orc;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils;
@ -90,7 +91,7 @@ public class DruidOrcInputFormatTest
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = inputFormat.createRecordReader(split, context);
OrcHadoopInputRowParser parser = (OrcHadoopInputRowParser) config.getParser();
InputRowParser<OrcStruct> parser = (InputRowParser<OrcStruct>) config.getParser();
reader.initialize(split, context);

View File

@ -206,6 +206,7 @@ public class OrcIndexGeneratorJobTest
new UniformGranularitySpec(
Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)
),
null,
mapper
),
new HadoopIOConfig(

View File

@ -30,10 +30,10 @@ import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.java.util.common.logger.Logger;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
@ -41,6 +41,7 @@ import net.jodah.lyra.config.Config;
import net.jodah.lyra.retry.RetryPolicy;
import net.jodah.lyra.util.Duration;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -100,7 +101,7 @@ import java.util.concurrent.LinkedBlockingQueue;
* For more information on RabbitMQ high availability please see:
* <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>.
*/
public class RabbitMQFirehoseFactory implements FirehoseFactory<ByteBufferInputRowParser>
public class RabbitMQFirehoseFactory implements FirehoseFactory<InputRowParser<ByteBuffer>>
{
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
@ -135,7 +136,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
}
@Override
public Firehose connect(final ByteBufferInputRowParser firehoseParser, File temporaryDirectory) throws IOException
public Firehose connect(final InputRowParser<ByteBuffer> firehoseParser, File temporaryDirectory) throws IOException
{
ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory);
Config lyraConfig = new Config()
@ -225,6 +226,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
return false;
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -24,11 +24,10 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.java.util.common.logger.Logger;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
@ -37,6 +36,7 @@ import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.InvalidMessageException;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -47,7 +47,7 @@ import java.util.Set;
/**
*/
public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInputRowParser>
public class KafkaEightFirehoseFactory implements FirehoseFactory<InputRowParser<ByteBuffer>>
{
private static final Logger log = new Logger(KafkaEightFirehoseFactory.class);
@ -69,13 +69,14 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
}
@Override
public Firehose connect(final ByteBufferInputRowParser firehoseParser, File temporaryDirectory) throws IOException
public Firehose connect(final InputRowParser<ByteBuffer> firehoseParser, File temporaryDirectory) throws IOException
{
Set<String> newDimExclus = Sets.union(
firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(),
Sets.newHashSet("feed")
);
final ByteBufferInputRowParser theParser = firehoseParser.withParseSpec(
final InputRowParser<ByteBuffer> theParser = firehoseParser.withParseSpec(
firehoseParser.getParseSpec()
.withDimensionsSpec(
firehoseParser.getParseSpec()
@ -111,6 +112,7 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
return iter.hasNext();
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -80,8 +80,8 @@ import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
import io.druid.server.security.AuthorizerMapper;
import io.druid.server.security.AuthorizationUtils;
import io.druid.server.security.AuthorizerMapper;
import io.druid.server.security.ForbiddenException;
import io.druid.server.security.Resource;
import io.druid.server.security.ResourceAction;
@ -466,17 +466,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
try {
final byte[] valueBytes = record.value();
if (valueBytes == null) {
throw new ParseException("null value");
}
final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(valueBytes)), "row");
final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent() && ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp());
final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent() && ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp());
if (!beforeMinimumMessageTime && !afterMaximumMessageTime) {
final InputRow row = valueBytes == null ? null : parser.parse(ByteBuffer.wrap(valueBytes));
if (row != null && withinMinMaxRecordTime(row)) {
final String sequenceName = sequenceNames.get(record.partition());
final AppenderatorDriverAddResult addResult = driver.add(
row,
@ -499,21 +491,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
fireDepartmentMetrics.incrementProcessed();
} else {
if (log.isDebugEnabled()) {
if (beforeMinimumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMinimumMessageTime().get()
);
} else if (afterMaximumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMaximumMessageTime().get()
);
}
}
fireDepartmentMetrics.incrementThrownAway();
}
}
@ -1230,4 +1207,31 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
}
}
private boolean withinMinMaxRecordTime(final InputRow row)
{
final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent()
&& ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp());
final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent()
&& ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp());
if (log.isDebugEnabled()) {
if (beforeMinimumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMinimumMessageTime().get()
);
} else if (afterMaximumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMaximumMessageTime().get()
);
}
}
return !beforeMinimumMessageTime && !afterMaximumMessageTime;
}
}

View File

@ -37,20 +37,14 @@ import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
//CHECKSTYLE.OFF: Regexp
import com.metamx.common.logger.Logger;
//CHECKSTYLE.ON: Regexp
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.LoggingEmitter;
import com.metamx.emitter.core.NoopEmitter;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.java.util.common.parsers.JSONPathFieldSpec;
import io.druid.java.util.common.parsers.JSONPathSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.discovery.DataNodeService;
@ -76,14 +70,18 @@ import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.indexing.test.TestDataSegmentAnnouncer;
import io.druid.indexing.test.TestDataSegmentKiller;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.JSONPathFieldSpec;
import io.druid.java.util.common.parsers.JSONPathSpec;
import io.druid.math.expr.ExprMacroTable;
import io.druid.metadata.EntryExistsException;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
@ -103,6 +101,7 @@ import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@ -113,6 +112,8 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper;
import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusher;
@ -186,7 +187,7 @@ public class KafkaIndexTaskTest
new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.<String>of("dim1", "dim2")),
DimensionsSpec.getDefaultSchemas(ImmutableList.<String>of("dim1", "dim1t", "dim2")),
null,
null
),
@ -199,6 +200,7 @@ public class KafkaIndexTaskTest
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null,
objectMapper
);
@ -235,11 +237,7 @@ public class KafkaIndexTaskTest
emitter = new ServiceEmitter(
"service",
"host",
new LoggingEmitter(
log,
LoggingEmitter.Level.ERROR,
new DefaultObjectMapper()
)
new NoopEmitter()
);
emitter.start();
EmittingLogger.registerEmitter(emitter);
@ -325,7 +323,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -349,8 +346,8 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
@ -369,7 +366,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -405,8 +401,8 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
@ -425,7 +421,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -461,8 +456,8 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
@ -481,7 +476,6 @@ public class KafkaIndexTaskTest
DateTimes.of("2010"),
false
),
null,
null
);
@ -518,9 +512,71 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("a"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("b"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc3));
Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3));
}
@Test(timeout = 60_000L)
public void testRunWithTransformSpec() throws Exception
{
final KafkaIndexTask task = createTask(
null,
DATA_SCHEMA.withTransformSpec(
new TransformSpec(
new SelectorDimFilter("dim1", "b", null),
ImmutableList.of(
new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil())
)
)
),
new KafkaIOConfig(
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
),
null
);
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for the task to start reading
while (task.getStatus() != KafkaIndexTask.Status.READING) {
Thread.sleep(10);
}
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get();
}
}
// Wait for task to exit
Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(1, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(4, task.getFireDepartmentMetrics().thrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", desc1));
}
@Test(timeout = 60_000L)
@ -546,7 +602,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -589,7 +644,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -613,8 +667,8 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
@ -643,7 +697,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -667,8 +720,8 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
@ -696,7 +749,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -731,7 +783,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
final KafkaIndexTask task2 = createTask(
@ -747,7 +798,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -783,8 +833,8 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
@ -803,7 +853,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
final KafkaIndexTask task2 = createTask(
@ -819,7 +868,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -843,8 +891,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(2, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway());
// Check published segments & metadata, should all be from the first task
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
@ -856,8 +904,8 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
@ -876,7 +924,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
final KafkaIndexTask task2 = createTask(
@ -892,7 +939,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -922,8 +968,8 @@ public class KafkaIndexTaskTest
Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(2, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway());
// Check published segments & metadata
SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
@ -932,10 +978,10 @@ public class KafkaIndexTaskTest
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc3));
Assert.assertEquals(ImmutableList.of("f"), readSegmentDim1(desc4));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3));
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc4));
}
@Test(timeout = 60_000L)
@ -954,7 +1000,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -988,13 +1033,13 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc4));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc4));
// Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically
Assert.assertEquals(
ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")),
ImmutableSet.of(readSegmentDim1(desc2), readSegmentDim1(desc3))
ImmutableSet.of(readSegmentColumn("dim1", desc2), readSegmentColumn("dim1", desc3))
);
}
@ -1014,7 +1059,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
final KafkaIndexTask task2 = createTask(
@ -1030,7 +1074,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -1067,9 +1110,9 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc3));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc3));
}
@Test(timeout = 60_000L)
@ -1088,7 +1131,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -1125,7 +1167,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -1159,8 +1200,8 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
@ -1179,7 +1220,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -1245,8 +1285,8 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
@ -1265,7 +1305,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -1333,9 +1372,9 @@ public class KafkaIndexTaskTest
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("b"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc3));
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc2));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3));
}
@Test(timeout = 30_000L)
@ -1354,7 +1393,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
null
);
@ -1394,7 +1432,6 @@ public class KafkaIndexTaskTest
null,
false
),
null,
true
);
@ -1464,13 +1501,22 @@ public class KafkaIndexTaskTest
private KafkaIndexTask createTask(
final String taskId,
final KafkaIOConfig ioConfig,
final Integer maxRowsPerSegment,
final Boolean resetOffsetAutomatically
)
{
return createTask(taskId, DATA_SCHEMA, ioConfig, resetOffsetAutomatically);
}
private KafkaIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KafkaIOConfig ioConfig,
final Boolean resetOffsetAutomatically
)
{
final KafkaTuningConfig tuningConfig = new KafkaTuningConfig(
1000,
maxRowsPerSegment,
null,
new Period("P1Y"),
null,
null,
@ -1483,7 +1529,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = new KafkaIndexTask(
taskId,
null,
cloneDataSchema(),
cloneDataSchema(dataSchema),
tuningConfig,
ioConfig,
null,
@ -1494,13 +1540,14 @@ public class KafkaIndexTaskTest
return task;
}
private static DataSchema cloneDataSchema()
private static DataSchema cloneDataSchema(final DataSchema dataSchema)
{
return new DataSchema(
DATA_SCHEMA.getDataSource(),
DATA_SCHEMA.getParserMap(),
DATA_SCHEMA.getAggregators(),
DATA_SCHEMA.getGranularitySpec(),
dataSchema.getDataSource(),
dataSchema.getParserMap(),
dataSchema.getAggregators(),
dataSchema.getGranularitySpec(),
dataSchema.getTransformSpec(),
objectMapper
);
}
@ -1696,7 +1743,7 @@ public class KafkaIndexTaskTest
return new File(directory, "segments");
}
private List<String> readSegmentDim1(final SegmentDescriptor descriptor) throws IOException
private List<String> readSegmentColumn(final String column, final SegmentDescriptor descriptor) throws IOException
{
File indexZip = new File(
StringUtils.format(
@ -1728,11 +1775,11 @@ public class KafkaIndexTaskTest
);
IndexIO indexIO = new TestUtils().getTestIndexIO();
QueryableIndex index = indexIO.loadIndex(outputLocation);
DictionaryEncodedColumn<String> dim1 = index.getColumn("dim1").getDictionaryEncoding();
DictionaryEncodedColumn<String> theColumn = index.getColumn(column).getDictionaryEncoding();
List<String> values = Lists.newArrayList();
for (int i = 0; i < dim1.length(); i++) {
int id = dim1.getSingleValueRow(i);
String value = dim1.lookupName(id);
for (int i = 0; i < theColumn.length(); i++) {
int id = theColumn.getSingleValueRow(i);
String value = theColumn.lookupName(id);
values.add(value);
}
return values;

View File

@ -1895,6 +1895,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Granularities.NONE,
ImmutableList.<Interval>of()
),
null,
objectMapper
);
}

View File

@ -30,6 +30,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import javax.annotation.Nullable;
import java.io.IOException;
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Object, Object, KEYOUT, VALUEOUT>
@ -75,10 +76,15 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
if (reportParseExceptions) {
throw e;
}
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value);
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row
}
if (inputRow == null) {
// Throw away null rows from the parser.
log.debug("Throwing away row [%s]", value);
return;
}
if (!granularitySpec.bucketIntervals().isPresent()
@ -92,7 +98,8 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
}
}
public final static InputRow parseInputRow(Object value, InputRowParser parser)
@Nullable
public static InputRow parseInputRow(Object value, InputRowParser parser)
{
if (parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
@ -101,6 +108,9 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
return ((StringInputRowParser) parser).parse(value.toString());
} else if (value instanceof InputRow) {
return (InputRow) value;
} else if (value == null) {
// Pass through nulls so they get thrown away.
return null;
} else {
return parser.parse(value);
}

View File

@ -26,10 +26,12 @@ import com.google.common.collect.ImmutableList;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.filter.DimFilter;
import io.druid.segment.indexing.TransformSpec;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.List;
import java.util.Objects;
public class DatasourceIngestionSpec
{
@ -41,6 +43,10 @@ public class DatasourceIngestionSpec
private final List<String> metrics;
private final boolean ignoreWhenNoSegments;
// Note that the only purpose of the transformSpec field is to hold the value from the overall dataSchema.
// It is not meant to be provided by end users, and will be overwritten.
private final TransformSpec transformSpec;
@JsonCreator
public DatasourceIngestionSpec(
@JsonProperty("dataSource") String dataSource,
@ -50,7 +56,8 @@ public class DatasourceIngestionSpec
@JsonProperty("filter") DimFilter filter,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments
@JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments,
@JsonProperty("transformSpec") TransformSpec transformSpec
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource");
@ -59,7 +66,7 @@ public class DatasourceIngestionSpec
interval == null || intervals == null,
"please specify intervals only"
);
List<Interval> theIntervals = null;
if (interval != null) {
theIntervals = ImmutableList.of(interval);
@ -78,6 +85,7 @@ public class DatasourceIngestionSpec
this.metrics = metrics;
this.ignoreWhenNoSegments = ignoreWhenNoSegments;
this.transformSpec = transformSpec != null ? transformSpec : TransformSpec.NONE;
}
@JsonProperty
@ -122,6 +130,12 @@ public class DatasourceIngestionSpec
return ignoreWhenNoSegments;
}
@JsonProperty
public TransformSpec getTransformSpec()
{
return transformSpec;
}
public DatasourceIngestionSpec withDimensions(List<String> dimensions)
{
return new DatasourceIngestionSpec(
@ -132,7 +146,8 @@ public class DatasourceIngestionSpec
filter,
dimensions,
metrics,
ignoreWhenNoSegments
ignoreWhenNoSegments,
transformSpec
);
}
@ -146,7 +161,8 @@ public class DatasourceIngestionSpec
filter,
dimensions,
metrics,
ignoreWhenNoSegments
ignoreWhenNoSegments,
transformSpec
);
}
@ -160,7 +176,8 @@ public class DatasourceIngestionSpec
filter,
dimensions,
metrics,
ignoreWhenNoSegments
ignoreWhenNoSegments,
transformSpec
);
}
@ -174,12 +191,28 @@ public class DatasourceIngestionSpec
filter,
dimensions,
metrics,
ignoreWhenNoSegments
ignoreWhenNoSegments,
transformSpec
);
}
public DatasourceIngestionSpec withTransformSpec(TransformSpec transformSpec)
{
return new DatasourceIngestionSpec(
dataSource,
null,
intervals,
segments,
filter,
dimensions,
metrics,
ignoreWhenNoSegments,
transformSpec
);
}
@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
@ -187,42 +220,30 @@ public class DatasourceIngestionSpec
if (o == null || getClass() != o.getClass()) {
return false;
}
DatasourceIngestionSpec that = (DatasourceIngestionSpec) o;
if (ignoreWhenNoSegments != that.ignoreWhenNoSegments) {
return false;
}
if (!dataSource.equals(that.dataSource)) {
return false;
}
if (!intervals.equals(that.intervals)) {
return false;
}
if (segments != null ? !segments.equals(that.segments) : that.segments != null) {
return false;
}
if (filter != null ? !filter.equals(that.filter) : that.filter != null) {
return false;
}
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false;
}
return !(metrics != null ? !metrics.equals(that.metrics) : that.metrics != null);
final DatasourceIngestionSpec that = (DatasourceIngestionSpec) o;
return ignoreWhenNoSegments == that.ignoreWhenNoSegments &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(intervals, that.intervals) &&
Objects.equals(segments, that.segments) &&
Objects.equals(filter, that.filter) &&
Objects.equals(dimensions, that.dimensions) &&
Objects.equals(metrics, that.metrics) &&
Objects.equals(transformSpec, that.transformSpec);
}
@Override
public int hashCode()
{
int result = dataSource.hashCode();
result = 31 * result + intervals.hashCode();
result = 31 * result + (segments != null ? segments.hashCode() : 0);
result = 31 * result + (filter != null ? filter.hashCode() : 0);
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
result = 31 * result + (ignoreWhenNoSegments ? 1 : 0);
return result;
return Objects.hash(
dataSource,
intervals,
segments,
filter,
dimensions,
metrics,
ignoreWhenNoSegments,
transformSpec
);
}
@Override
@ -236,6 +257,7 @@ public class DatasourceIngestionSpec
", dimensions=" + dimensions +
", metrics=" + metrics +
", ignoreWhenNoSegments=" + ignoreWhenNoSegments +
", transformSpec=" + transformSpec +
'}';
}
}

View File

@ -24,13 +24,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -61,6 +59,7 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
public static final String CONF_INPUT_SEGMENTS = "druid.segments";
public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema";
public static final String CONF_TRANSFORM_SPEC = "druid.datasource.transformSpec";
public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size";
@Override

View File

@ -27,8 +27,7 @@ import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.java.util.common.ISE;
@ -57,7 +56,7 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
private IngestSegmentFirehose firehose;
private int rowNum;
private MapBasedRow currRow;
private Row currRow;
private List<QueryableIndex> indexes = Lists.newArrayList();
private List<File> tmpSegmentDirs = Lists.newArrayList();
@ -108,18 +107,18 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
firehose = new IngestSegmentFirehose(
adapters,
spec.getTransformSpec(),
spec.getDimensions(),
spec.getMetrics(),
spec.getFilter()
);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
if (firehose.hasMore()) {
currRow = (MapBasedRow) firehose.nextRow();
currRow = firehose.nextRow();
rowNum++;
return true;
} else {
@ -136,13 +135,7 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
@Override
public InputRow getCurrentValue() throws IOException, InterruptedException
{
return new SegmentInputRow(
new MapBasedInputRow(
currRow.getTimestamp(),
spec.getDimensions(),
currRow.getEvent()
)
);
return currRow == null ? null : new SegmentInputRow(currRow, spec.getDimensions());
}
@Override

View File

@ -33,17 +33,19 @@ import java.util.List;
*/
public class SegmentInputRow implements InputRow
{
private final InputRow delegate;
private final Row delegate;
private final List<String> dimensions;
public SegmentInputRow(InputRow delegate)
public SegmentInputRow(Row delegate, List<String> dimensions)
{
this.delegate = delegate;
this.dimensions = dimensions;
}
@Override
public List<String> getDimensions()
{
return delegate.getDimensions();
return dimensions;
}
@Override
@ -82,11 +84,6 @@ public class SegmentInputRow implements InputRow
return delegate.compareTo(row);
}
public InputRow getDelegate()
{
return delegate;
}
@Override
public String toString()
{

View File

@ -28,7 +28,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputFormat;
@ -158,6 +157,11 @@ public class DatasourcePathSpec implements PathSpec
updatedIngestionSpec = updatedIngestionSpec.withQueryGranularity(config.getGranularitySpec().getQueryGranularity());
// propagate in the transformSpec from the overall job config
updatedIngestionSpec = updatedIngestionSpec.withTransformSpec(
config.getSchema().getDataSchema().getTransformSpec()
);
job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(updatedIngestionSpec));
job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments));
job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize));

View File

@ -47,6 +47,7 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
@ -322,6 +323,7 @@ public class BatchDeltaIngestionTest
Firehose firehose = new IngestSegmentFirehose(
ImmutableList.of(new WindowedStorageAdapter(adapter, windowedDataSegment.getInterval())),
TransformSpec.NONE,
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
null
@ -363,6 +365,7 @@ public class BatchDeltaIngestionTest
new UniformGranularitySpec(
Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL)
),
null,
MAPPER
),
new HadoopIOConfig(

View File

@ -181,6 +181,7 @@ public class DetermineHashedPartitionsJobTest
Granularities.NONE,
intervals
),
null,
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(

View File

@ -239,6 +239,7 @@ public class DeterminePartitionsJobTest
new UniformGranularitySpec(
Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of(interval))
),
null,
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(

View File

@ -73,6 +73,7 @@ public class HadoopDruidIndexerConfigTest
Granularities.MINUTE,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
jsonMapper
),
new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar", "type", "static"), null, null),
@ -136,6 +137,7 @@ public class HadoopDruidIndexerConfigTest
Granularities.MINUTE,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
jsonMapper
),
new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar", "type", "static"), null, null),

View File

@ -0,0 +1,213 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.path.StaticPathSpec;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.java.util.common.parsers.JSONPathSpec;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.TestHelper;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class HadoopDruidIndexerMapperTest
{
private static final ObjectMapper JSON_MAPPER = TestHelper.getJsonMapper();
private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
JSON_MAPPER.convertValue(
new HadoopyStringInputRowParser(
new JSONParseSpec(
new TimestampSpec("t", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim1t", "dim2")),
null,
null
),
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of()
)
),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null,
JSON_MAPPER
);
private static final HadoopIOConfig IO_CONFIG = new HadoopIOConfig(
JSON_MAPPER.convertValue(
new StaticPathSpec("dummyPath", null),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
),
null,
"dummyOutputPath"
);
private static final HadoopTuningConfig TUNING_CONFIG = HadoopTuningConfig
.makeDefaultTuningConfig()
.withWorkingPath("dummyWorkingPath");
@Test
public void testHadoopyStringParser() throws Exception
{
final HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(DATA_SCHEMA, IO_CONFIG, TUNING_CONFIG)
);
final MyMapper mapper = new MyMapper();
final Configuration hadoopConfig = new Configuration();
hadoopConfig.set(
HadoopDruidIndexerConfig.CONFIG_PROPERTY,
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(config)
);
final Mapper.Context mapContext = EasyMock.mock(Mapper.Context.class);
EasyMock.expect(mapContext.getConfiguration()).andReturn(hadoopConfig).once();
EasyMock.replay(mapContext);
mapper.setup(mapContext);
final List<Map<String, Object>> rows = ImmutableList.of(
ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "x", "m1", 1.0),
ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim2", "y", "m1", 1.0)
);
for (Map<String, Object> row : rows) {
mapper.map(NullWritable.get(), new Text(JSON_MAPPER.writeValueAsString(row)), mapContext);
}
assertRowListEquals(rows, mapper.getRows());
}
@Test
public void testHadoopyStringParserWithTransformSpec() throws Exception
{
final HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
DATA_SCHEMA.withTransformSpec(
new TransformSpec(
new SelectorDimFilter("dim1", "foo", null),
ImmutableList.of(
new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil())
)
)
),
IO_CONFIG,
TUNING_CONFIG
)
);
final MyMapper mapper = new MyMapper();
final Configuration hadoopConfig = new Configuration();
hadoopConfig.set(
HadoopDruidIndexerConfig.CONFIG_PROPERTY,
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(config)
);
final Mapper.Context mapContext = EasyMock.mock(Mapper.Context.class);
EasyMock.expect(mapContext.getConfiguration()).andReturn(hadoopConfig).once();
EasyMock.replay(mapContext);
mapper.setup(mapContext);
final List<Map<String, Object>> rows = ImmutableList.of(
ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "foo", "dim2", "x", "m1", 1.0),
ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "bar", "dim2", "y", "m1", 1.0),
ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "foo", "dim2", "z", "m1", 1.0)
);
for (Map<String, Object> row : rows) {
mapper.map(NullWritable.get(), new Text(JSON_MAPPER.writeValueAsString(row)), mapContext);
}
assertRowListEquals(
ImmutableList.of(
ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "foo", "dim1t", "foofoo", "dim2", "x", "m1", 1.0),
ImmutableMap.of("t", "2000-01-01T00:00:00.000Z", "dim1", "foo", "dim1t", "foofoo", "dim2", "z", "m1", 1.0)
),
mapper.getRows()
);
}
private static void assertRowListEquals(final List<Map<String, Object>> expected, final List<InputRow> actual)
{
Assert.assertEquals(
expected,
actual.stream().map(HadoopDruidIndexerMapperTest::rowToMap).collect(Collectors.toList())
);
}
private static Map<String, Object> rowToMap(final InputRow row)
{
// Normalize input row for the purposes of testing.
final ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put("t", row.getTimestamp().toString());
for (String dim : row.getDimensions()) {
final Object val = row.getRaw(dim);
if (val != null) {
builder.put(dim, val);
}
}
// other, non-dimension fields are not self describing so much be specified individually
builder.put("m1", row.getRaw("m1"));
return builder.build();
}
public static class MyMapper extends HadoopDruidIndexerMapper
{
private final List<InputRow> rows = new ArrayList<>();
@Override
protected void innerMap(
final InputRow inputRow,
final Object value,
final Context context,
final boolean reportParseExceptions
) throws IOException, InterruptedException
{
rows.add(inputRow);
}
public List<InputRow> getRows()
{
return rows;
}
}
}

View File

@ -93,7 +93,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false),
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false, null),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -120,7 +120,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
null,
null,
null,
false
false,
null
),
null
);
@ -148,7 +149,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
null,
null,
null,
false
false,
null
),
null
);
@ -173,7 +175,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
null,
null,
null,
false
false,
null
),
null
);
@ -204,7 +207,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
null,
null,
null,
false
false,
null
),
null
)
@ -236,6 +240,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
null,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
jsonMapper
),
new HadoopIOConfig(

View File

@ -82,6 +82,7 @@ public class IndexGeneratorCombinerTest
new UniformGranularitySpec(
Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of("2010/2011"))
),
null,
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(

View File

@ -496,6 +496,7 @@ public class IndexGeneratorJobTest
new UniformGranularitySpec(
Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)
),
null,
mapper
),
new HadoopIOConfig(

View File

@ -89,6 +89,7 @@ public class JobHelperTest
new UniformGranularitySpec(
Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)
),
null,
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(

View File

@ -51,7 +51,8 @@ public class DatasourceIngestionSpecTest
new SelectorDimFilter("dim", "value", null),
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3"),
false
false,
null
);
DatasourceIngestionSpec actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), DatasourceIngestionSpec.class);
@ -85,7 +86,8 @@ public class DatasourceIngestionSpecTest
null,
null,
null,
false
false,
null
);
Assert.assertEquals(expected, actual);
@ -133,7 +135,8 @@ public class DatasourceIngestionSpecTest
new SelectorDimFilter("dim", "value", null),
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3"),
true
true,
null
);
actual = MAPPER.readValue(
@ -153,7 +156,7 @@ public class DatasourceIngestionSpecTest
DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class);
Assert.assertEquals(
new DatasourceIngestionSpec("test", Intervals.of("2014/2015"), null, null, null, null, null, false),
new DatasourceIngestionSpec("test", Intervals.of("2014/2015"), null, null, null, null, null, false, null),
actual
);
}

View File

@ -69,7 +69,8 @@ public class DatasourceRecordReaderTest
null,
segment.getDimensions(),
segment.getMetrics(),
false
false,
null
)
)
);

View File

@ -79,7 +79,8 @@ public class DatasourcePathSpecTest
null,
null,
null,
false
false,
null
);
segments = ImmutableList.of(
@ -279,6 +280,7 @@ public class DatasourcePathSpecTest
new UniformGranularitySpec(
Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of("2000/3000"))
),
null,
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(

View File

@ -153,6 +153,7 @@ public class GranularityPathSpecTest
Granularities.MINUTE,
ImmutableList.of(Intervals.of("2015-11-06T00:00Z/2015-11-07T00:00Z"))
),
null,
jsonMapper
),
new HadoopIOConfig(null, null, null),
@ -204,6 +205,7 @@ public class GranularityPathSpecTest
Granularities.ALL,
ImmutableList.of(Intervals.of("2015-01-01T11Z/2015-01-02T05Z"))
),
null,
jsonMapper
),
new HadoopIOConfig(null, null, null),

View File

@ -54,7 +54,7 @@ public class StaticPathSpecTest
Job job = new Job();
StaticPathSpec pathSpec = new StaticPathSpec("/a/c,/a/b/{c,d}", null);
DataSchema schema = new DataSchema("ds", null, new AggregatorFactory[0], null, jsonMapper);
DataSchema schema = new DataSchema("ds", null, new AggregatorFactory[0], null, null, jsonMapper);
HadoopIOConfig io = new HadoopIOConfig(null, null, null);
pathSpec.addInputPaths(new HadoopDruidIndexerConfig(new HadoopIngestionSpec(schema, io, null)), job);

View File

@ -182,6 +182,7 @@ public class HadoopConverterJobTest
Granularities.DAY,
ImmutableList.<Interval>of(interval)
),
null,
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(

View File

@ -659,6 +659,7 @@ public class IndexTask extends AbstractTask
final InputRow inputRow = firehose.nextRow();
if (inputRow == null) {
fireDepartmentMetrics.incrementThrownAway();
continue;
}
@ -668,7 +669,6 @@ public class IndexTask extends AbstractTask
continue;
}
final String sequenceName;
if (isGuaranteedRollup) {

View File

@ -42,6 +42,7 @@ import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.filter.DimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
@ -281,7 +282,8 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
)
);
return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter);
final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
}
catch (IOException | SegmentLoadingException e) {
throw Throwables.propagate(e);

View File

@ -54,7 +54,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
id,
taskResource,
new FireDepartment(
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null, mapper),
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null, null, mapper),
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null),
(schema, config, metrics) -> null,

View File

@ -20,6 +20,7 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
@ -48,12 +49,16 @@ import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
@ -209,6 +214,50 @@ public class IndexTaskTest
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
}
@Test
public void testTransformSpec() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(
tmpDir,
null,
new TransformSpec(
new SelectorDimFilter("dim", "b", null),
ImmutableList.of(
new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil())
)
),
null,
createTuningConfig(2, null, true, false),
false
),
null
);
Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(1, segments.size());
Assert.assertEquals("test", segments.get(0).getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
}
@Test
public void testWithArbitraryGranularity() throws Exception
{
@ -1012,6 +1061,18 @@ public class IndexTaskTest
IndexTuningConfig tuningConfig,
boolean appendToExisting
)
{
return createIngestionSpec(baseDir, parseSpec, TransformSpec.NONE, granularitySpec, tuningConfig, appendToExisting);
}
private IndexTask.IndexIngestionSpec createIngestionSpec(
File baseDir,
ParseSpec parseSpec,
TransformSpec transformSpec,
GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig,
boolean appendToExisting
)
{
return new IndexTask.IndexIngestionSpec(
new DataSchema(
@ -1031,6 +1092,7 @@ public class IndexTaskTest
Granularities.MINUTE,
Arrays.asList(Intervals.of("2014/2015"))
),
transformSpec,
jsonMapper
),
new IndexTask.IndexIOConfig(

View File

@ -30,21 +30,20 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
//CHECKSTYLE.OFF: Regexp
import com.metamx.common.logger.Logger;
//CHECKSTYLE.ON: Regexp
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.LoggingEmitter;
import com.metamx.emitter.core.NoopEmitter;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
@ -72,9 +71,13 @@ import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.math.expr.ExprMacroTable;
import io.druid.metadata.EntryExistsException;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.Druids;
@ -91,14 +94,19 @@ import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.TestHelper;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
@ -130,6 +138,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@ -140,25 +149,26 @@ import java.util.concurrent.Executor;
public class RealtimeIndexTaskTest
{
private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ServiceEmitter emitter = new ServiceEmitter(
"service",
"host",
new LoggingEmitter(
log,
LoggingEmitter.Level.ERROR,
jsonMapper
)
new NoopEmitter()
);
private static final String FAIL_DIM = "__fail__";
private static class TestFirehose implements Firehose
{
private final List<InputRow> queue = Lists.newLinkedList();
private final InputRowParser<Map<String, Object>> parser;
private final List<Map<String, Object>> queue = new LinkedList<>();
private boolean closed = false;
public void addRows(List<InputRow> rows)
public TestFirehose(final InputRowParser<Map<String, Object>> parser)
{
this.parser = parser;
}
public void addRows(List<Map<String, Object>> rows)
{
synchronized (this) {
queue.addAll(rows);
@ -187,8 +197,8 @@ public class RealtimeIndexTaskTest
public InputRow nextRow()
{
synchronized (this) {
final InputRow row = queue.remove(0);
if (row != null && row.getDimensions().contains(FAIL_DIM)) {
final InputRow row = parser.parse(queue.remove(0));
if (row != null && row.getRaw(FAIL_DIM) != null) {
throw new ParseException(FAIL_DIM);
}
return row;
@ -198,14 +208,7 @@ public class RealtimeIndexTaskTest
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
// do nothing
}
};
return () -> {};
}
@Override
@ -218,16 +221,17 @@ public class RealtimeIndexTaskTest
}
}
private static class TestFirehoseFactory implements FirehoseFactory
private static class TestFirehoseFactory implements FirehoseFactory<InputRowParser>
{
public TestFirehoseFactory()
{
}
@Override
@SuppressWarnings("unchecked")
public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException, ParseException
{
return new TestFirehose();
return new TestFirehose(parser);
}
}
@ -277,7 +281,7 @@ public class RealtimeIndexTaskTest
public void testHandoffTimeout() throws Exception
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final RealtimeIndexTask task = makeRealtimeTask(null, true, 100L);
final RealtimeIndexTask task = makeRealtimeTask(null, TransformSpec.NONE, true, 100L);
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);
@ -289,12 +293,8 @@ public class RealtimeIndexTaskTest
final TestFirehose firehose = (TestFirehose) task.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "1")
)
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1")
)
);
@ -331,22 +331,10 @@ public class RealtimeIndexTaskTest
final TestFirehose firehose = (TestFirehose) task.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "1")
),
new MapBasedInputRow(
now.minus(new Period("P1D")),
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", 2.0)
),
new MapBasedInputRow(
now,
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of("dim2", "bar", "met1", 2.0)
)
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"),
ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", 2.0),
ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0)
)
);
@ -366,8 +354,79 @@ public class RealtimeIndexTaskTest
Assert.assertEquals(0, task.getMetrics().unparseable());
// Do some queries.
Assert.assertEquals(2, sumMetric(task, "rows"));
Assert.assertEquals(3, sumMetric(task, "met1"));
Assert.assertEquals(2, sumMetric(task, null, "rows"));
Assert.assertEquals(3, sumMetric(task, null, "met1"));
// Simulate handoff.
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
final Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
Assert.assertEquals(
new SegmentDescriptor(
publishedSegment.getInterval(),
publishedSegment.getVersion(),
publishedSegment.getShardSpec().getPartitionNum()
),
entry.getKey()
);
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
}
handOffCallbacks.clear();
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
}
@Test(timeout = 60_000L)
public void testTransformSpec() throws Exception
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final TransformSpec transformSpec = new TransformSpec(
new SelectorDimFilter("dim1", "foo", null),
ImmutableList.of(
new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil())
)
);
final RealtimeIndexTask task = makeRealtimeTask(null, transformSpec, true, 0);
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);
final DataSegment publishedSegment;
// Wait for firehose to show up, it starts off null.
while (task.getFirehose() == null) {
Thread.sleep(50);
}
final TestFirehose firehose = (TestFirehose) task.getFirehose();
firehose.addRows(
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"),
ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", 2.0),
ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0)
)
);
// Stop the firehose, this will drain out existing events.
firehose.close();
// Wait for publish.
while (mdc.getPublished().isEmpty()) {
Thread.sleep(50);
}
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Check metrics.
Assert.assertEquals(1, task.getMetrics().processed());
Assert.assertEquals(2, task.getMetrics().thrownAway());
Assert.assertEquals(0, task.getMetrics().unparseable());
// Do some queries.
Assert.assertEquals(1, sumMetric(task, null, "rows"));
Assert.assertEquals(1, sumMetric(task, new SelectorDimFilter("dim1t", "foofoo", null), "rows"));
Assert.assertEquals(0, sumMetric(task, new SelectorDimFilter("dim1t", "barbar", null), "rows"));
Assert.assertEquals(1, sumMetric(task, null, "met1"));
// Simulate handoff.
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
@ -405,27 +464,11 @@ public class RealtimeIndexTaskTest
final TestFirehose firehose = (TestFirehose) task.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "1")
),
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "foo")
),
new MapBasedInputRow(
now.minus(new Period("P1D")),
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "foo")
),
new MapBasedInputRow(
now,
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of("dim2", "bar", "met1", 2.0)
)
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"),
ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"),
ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", "foo"),
ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0)
)
);
@ -446,7 +489,7 @@ public class RealtimeIndexTaskTest
CoreMatchers.allOf(
CoreMatchers.<Throwable>instanceOf(ParseException.class),
ThrowableMessageMatcher.hasMessage(
CoreMatchers.containsString("Unable to parse metrics[met1], value[foo]")
CoreMatchers.containsString("Unable to parse value[foo] for field[met1]")
)
)
)
@ -472,39 +515,24 @@ public class RealtimeIndexTaskTest
final TestFirehose firehose = (TestFirehose) task.getFirehose();
firehose.addRows(
Arrays.<InputRow>asList(
Arrays.asList(
// Good row- will be processed.
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "1")
),
// Null row- will be unparseable.
ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"),
// Null row- will be thrown away.
null,
// Bad metric- will count as processed, but that particular metric won't update.
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "foo")
),
ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"),
// Bad row- will be unparseable.
new MapBasedInputRow(
now,
ImmutableList.of("dim1", FAIL_DIM),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", 2.0)
),
ImmutableMap.of("dim1", "foo", "met1", 2.0, FAIL_DIM, "x"),
// Old row- will be thrownAway.
new MapBasedInputRow(
now.minus(new Period("P1D")),
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", 2.0)
),
ImmutableMap.of("t", now.minus(Period.days(1)).getMillis(), "dim1", "foo", "met1", 2.0),
// Good row- will be processed.
new MapBasedInputRow(
now,
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of("dim2", "bar", "met1", 2.0)
)
ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0)
)
);
@ -524,8 +552,8 @@ public class RealtimeIndexTaskTest
Assert.assertEquals(2, task.getMetrics().unparseable());
// Do some queries.
Assert.assertEquals(3, sumMetric(task, "rows"));
Assert.assertEquals(3, sumMetric(task, "met1"));
Assert.assertEquals(3, sumMetric(task, null, "rows"));
Assert.assertEquals(3, sumMetric(task, null, "met1"));
// Simulate handoff.
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
@ -568,12 +596,8 @@ public class RealtimeIndexTaskTest
final TestFirehose firehose = (TestFirehose) task1.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo")
)
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim1", "foo")
)
);
@ -601,17 +625,13 @@ public class RealtimeIndexTaskTest
}
// Do a query, at this point the previous data should be loaded.
Assert.assertEquals(1, sumMetric(task2, "rows"));
Assert.assertEquals(1, sumMetric(task2, null, "rows"));
final TestFirehose firehose = (TestFirehose) task2.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of("dim2", "bar")
)
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim2", "bar")
)
);
@ -626,7 +646,7 @@ public class RealtimeIndexTaskTest
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Do a query.
Assert.assertEquals(2, sumMetric(task2, "rows"));
Assert.assertEquals(2, sumMetric(task2, null, "rows"));
// Simulate handoff.
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
@ -671,12 +691,8 @@ public class RealtimeIndexTaskTest
final TestFirehose firehose = (TestFirehose) task1.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo")
)
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim1", "foo")
)
);
@ -691,7 +707,7 @@ public class RealtimeIndexTaskTest
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Do a query.
Assert.assertEquals(1, sumMetric(task1, "rows"));
Assert.assertEquals(1, sumMetric(task1, null, "rows"));
// Trigger graceful shutdown.
task1.stopGracefully();
@ -768,12 +784,8 @@ public class RealtimeIndexTaskTest
final TestFirehose firehose = (TestFirehose) task1.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo")
)
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim1", "foo")
)
);
@ -863,22 +875,40 @@ public class RealtimeIndexTaskTest
private RealtimeIndexTask makeRealtimeTask(final String taskId)
{
return makeRealtimeTask(taskId, true, 0);
return makeRealtimeTask(taskId, TransformSpec.NONE, true, 0);
}
private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions)
{
return makeRealtimeTask(taskId, reportParseExceptions, 0);
return makeRealtimeTask(taskId, TransformSpec.NONE, reportParseExceptions, 0);
}
private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions, long handoffTimeout)
private RealtimeIndexTask makeRealtimeTask(
final String taskId,
final TransformSpec transformSpec,
final boolean reportParseExceptions,
final long handoffTimeout
)
{
ObjectMapper objectMapper = new DefaultObjectMapper();
DataSchema dataSchema = new DataSchema(
"test_ds",
null,
TestHelper.getJsonMapper().convertValue(
new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("t", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim1t")),
null,
null
)
)
),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
),
new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
transformSpec,
objectMapper
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
@ -1065,11 +1095,12 @@ public class RealtimeIndexTaskTest
return toolboxFactory.build(task);
}
public long sumMetric(final Task task, final String metric) throws Exception
public long sumMetric(final Task task, final DimFilter filter, final String metric) throws Exception
{
// Do a query.
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test_ds")
.filters(filter)
.aggregators(
ImmutableList.<AggregatorFactory>of(
new LongSumAggregatorFactory(metric, metric)

View File

@ -185,6 +185,7 @@ public class TaskSerdeTest
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
),
null,
jsonMapper
),
new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
@ -247,6 +248,7 @@ public class TaskSerdeTest
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
),
null,
jsonMapper
),
new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
@ -469,6 +471,7 @@ public class TaskSerdeTest
null,
new AggregatorFactory[0],
new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null),
null,
jsonMapper
),
new RealtimeIOConfig(
@ -760,6 +763,7 @@ public class TaskSerdeTest
null,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
jsonMapper
), new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar"), null, null), null
),

View File

@ -33,14 +33,13 @@ import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Module;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.guice.GuiceAnnotationIntrospector;
import io.druid.guice.GuiceInjectableValues;
@ -56,8 +55,11 @@ import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.math.expr.ExprMacroTable;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -65,8 +67,12 @@ import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.TestHelper;
import io.druid.segment.column.Column;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
@ -115,7 +121,7 @@ public class IngestSegmentFirehoseFactoryTest
static {
TestUtils testUtils = new TestUtils();
MAPPER = setupInjectablesInObjectMapper(testUtils.getTestObjectMapper());
MAPPER = setupInjectablesInObjectMapper(TestHelper.getJsonMapper());
INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
INDEX_IO = testUtils.getTestIndexIO();
}
@ -394,7 +400,9 @@ public class IngestSegmentFirehoseFactoryTest
)
{
this.factory = factory;
this.rowParser = rowParser;
// Must decorate the parser, since IngestSegmentFirehoseFactory will undecorate it.
this.rowParser = TransformSpec.NONE.decorate(rowParser);
}
private static final Logger log = new Logger(IngestSegmentFirehoseFactoryTest.class);
@ -420,15 +428,13 @@ public class IngestSegmentFirehoseFactoryTest
private final InputRowParser rowParser;
private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
new JSONParseSpec(
new TimeAndDimsParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of(DIM_NAME)),
ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME),
ImmutableList.<SpatialDimensionSchema>of()
),
null,
null
ImmutableList.of()
)
)
);
@ -533,6 +539,42 @@ public class IngestSegmentFirehoseFactoryTest
Assert.assertEquals((int) MAX_SHARD_NUMBER * MAX_ROWS, (int) rowcount);
}
@Test
public void testTransformSpec() throws IOException
{
Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size());
Integer rowcount = 0;
final TransformSpec transformSpec = new TransformSpec(
new SelectorDimFilter(Column.TIME_COLUMN_NAME, "1", null),
ImmutableList.of(
new ExpressionTransform(METRIC_FLOAT_NAME, METRIC_FLOAT_NAME + " * 10", ExprMacroTable.nil())
)
);
int skipped = 0;
try (final IngestSegmentFirehose firehose =
(IngestSegmentFirehose)
factory.connect(transformSpec.decorate(rowParser), null)) {
while (firehose.hasMore()) {
InputRow row = firehose.nextRow();
if (row == null) {
skipped++;
continue;
}
Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray());
Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray());
Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getMetric(METRIC_LONG_NAME).longValue());
Assert.assertEquals(
METRIC_FLOAT_VALUE * 10,
row.getMetric(METRIC_FLOAT_NAME).floatValue(),
METRIC_FLOAT_VALUE * 0.0001
);
++rowcount;
}
}
Assert.assertEquals(90, skipped);
Assert.assertEquals((int) MAX_ROWS, (int) rowcount);
}
private static ServiceEmitter newMockEmitter()
{
return new NoopServiceEmitter();

View File

@ -59,6 +59,7 @@ import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
@ -91,16 +92,20 @@ public class IngestSegmentFirehoseFactoryTimelineTest
private static final String TIME_COLUMN = "t";
private static final String[] DIMENSIONS = new String[]{"d1"};
private static final String[] METRICS = new String[]{"m1"};
private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)),
// Must decorate the parser, since IngestSegmentFirehoseFactory will undecorate it.
private static final InputRowParser<Map<String, Object>> ROW_PARSER = TransformSpec.NONE.decorate(
new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)),
null,
null
),
null,
null
),
null,
null
)
)
);

View File

@ -129,6 +129,7 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
@ -257,6 +258,7 @@ public class TaskLifecycleTest
return true;
}
@Nullable
@Override
public InputRow nextRow()
{
@ -311,6 +313,7 @@ public class TaskLifecycleTest
return inputRowIterator.hasNext();
}
@Nullable
@Override
public InputRow nextRow()
{
@ -661,6 +664,7 @@ public class TaskLifecycleTest
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
),
null,
mapper
),
new IndexIOConfig(new MockFirehoseFactory(false), false),
@ -718,6 +722,7 @@ public class TaskLifecycleTest
null,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
mapper
),
new IndexIOConfig(new MockExceptionalFirehoseFactory(), false),
@ -1082,6 +1087,7 @@ public class TaskLifecycleTest
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
),
null,
mapper
),
new IndexIOConfig(new MockFirehoseFactory(false), false),
@ -1183,6 +1189,7 @@ public class TaskLifecycleTest
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null,
mapper
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(

View File

@ -58,7 +58,7 @@ public class TaskAnnouncementTest
"theid",
new TaskResource("rofl", 2),
new FireDepartment(
new DataSchema("foo", null, new AggregatorFactory[0], null, new DefaultObjectMapper()),
new DataSchema("foo", null, new AggregatorFactory[0], null, null, new DefaultObjectMapper()),
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{

View File

@ -50,6 +50,7 @@ public class DataSchema
private final Map<String, Object> parser;
private final AggregatorFactory[] aggregators;
private final GranularitySpec granularitySpec;
private final TransformSpec transformSpec;
private final ObjectMapper jsonMapper;
@ -61,12 +62,14 @@ public class DataSchema
@JsonProperty("parser") Map<String, Object> parser,
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("transformSpec") TransformSpec transformSpec,
@JacksonInject ObjectMapper jsonMapper
)
{
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMapper.");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource cannot be null. Please provide a dataSource.");
this.parser = parser;
this.transformSpec = transformSpec == null ? TransformSpec.NONE : transformSpec;
if (granularitySpec == null) {
log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default.");
@ -114,7 +117,9 @@ public class DataSchema
return cachedParser;
}
final InputRowParser inputRowParser = jsonMapper.convertValue(this.parser, InputRowParser.class);
final InputRowParser inputRowParser = transformSpec.decorate(
jsonMapper.convertValue(this.parser, InputRowParser.class)
);
final Set<String> dimensionExclusions = Sets.newHashSet();
for (AggregatorFactory aggregator : aggregators) {
@ -149,12 +154,12 @@ public class DataSchema
cachedParser = inputRowParser.withParseSpec(
inputRowParser.getParseSpec()
.withDimensionsSpec(
dimensionsSpec
.withDimensionExclusions(
Sets.difference(dimensionExclusions, dimSet)
.withDimensionsSpec(
dimensionsSpec
.withDimensionExclusions(
Sets.difference(dimensionExclusions, dimSet)
)
)
)
);
} else {
cachedParser = inputRowParser;
@ -179,9 +184,20 @@ public class DataSchema
return granularitySpec;
}
@JsonProperty
public TransformSpec getTransformSpec()
{
return transformSpec;
}
public DataSchema withGranularitySpec(GranularitySpec granularitySpec)
{
return new DataSchema(dataSource, parser, aggregators, granularitySpec, jsonMapper);
return new DataSchema(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper);
}
public DataSchema withTransformSpec(TransformSpec transformSpec)
{
return new DataSchema(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper);
}
@Override
@ -192,6 +208,7 @@ public class DataSchema
", parser=" + parser +
", aggregators=" + Arrays.toString(aggregators) +
", granularitySpec=" + granularitySpec +
", transformSpec=" + transformSpec +
'}';
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.data.input.Row;
import io.druid.math.expr.Expr;
import io.druid.math.expr.ExprMacroTable;
import io.druid.math.expr.Parser;
import io.druid.segment.column.Column;
import java.util.Objects;
public class ExpressionTransform implements Transform
{
private final String name;
private final String expression;
private final ExprMacroTable macroTable;
@JsonCreator
public ExpressionTransform(
@JsonProperty("name") final String name,
@JsonProperty("expression") final String expression,
@JacksonInject ExprMacroTable macroTable
)
{
this.name = Preconditions.checkNotNull(name, "name");
this.expression = Preconditions.checkNotNull(expression, "expression");
this.macroTable = macroTable;
}
@JsonProperty
@Override
public String getName()
{
return name;
}
@JsonProperty
public String getExpression()
{
return expression;
}
@Override
public RowFunction getRowFunction()
{
final Expr expr = Parser.parse(expression, Preconditions.checkNotNull(this.macroTable, "macroTable"));
return new ExpressionRowFunction(expr);
}
static class ExpressionRowFunction implements RowFunction
{
private final Expr expr;
ExpressionRowFunction(final Expr expr)
{
this.expr = expr;
}
@Override
public Object eval(final Row row)
{
return expr.eval(name -> getValueFromRow(row, name)).value();
}
}
private static Object getValueFromRow(final Row row, final String column)
{
if (column.equals(Column.TIME_COLUMN_NAME)) {
return row.getTimestampFromEpoch();
} else {
return row.getRaw(column);
}
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ExpressionTransform that = (ExpressionTransform) o;
return Objects.equals(name, that.name) &&
Objects.equals(expression, that.expression);
}
@Override
public int hashCode()
{
return Objects.hash(name, expression);
}
@Override
public String toString()
{
return "ExpressionTransform{" +
"name='" + name + '\'' +
", expression='" + expression + '\'' +
'}';
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
import io.druid.data.input.Row;
/**
* Interface for evaluating functions on rows. Used by {@link Transformer}.
*/
public interface RowFunction
{
Object eval(Row row);
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* A row transform that is part of a {@link TransformSpec}. Transforms allow adding new fields to input rows. Each
* one has a "name" (the name of the new field) which can be referred to by DimensionSpecs, AggregatorFactories, etc.
* Each also has a "row function", which produces values for this new field based on looking at the entire input row.
*
* If a transform has the same name as a field in an input row, then it will shadow the original field. Transforms
* that shadow fields may still refer to the fields they shadow. This can be used to transform a field "in-place".
*
* Transforms do have some limitations. They can only refer to fields present in the actual input rows; in particular,
* they cannot refer to other transforms. And they cannot remove fields, only add them. However, they can shadow a
* field with another field containing all nulls, which will act similarly to removing the field.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "expression", value = ExpressionTransform.class)
})
public interface Transform
{
/**
* Returns the field name for this transform.
*/
String getName();
/**
* Returns the function for this transform. The RowFunction takes an entire row as input and returns a column value
* as output.
*/
RowFunction getRowFunction();
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.java.util.common.ISE;
import io.druid.query.filter.DimFilter;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* Specifies how input rows should be filtered and transforms. There are two parts: a "filter" (which can filter out
* input rows) and "transforms" (which can add fields to input rows). Filters may refer to fields generated by
* a transform.
*
* See {@link Transform} for details on how each transform works.
*/
public class TransformSpec
{
public static final TransformSpec NONE = new TransformSpec(null, null);
private final DimFilter filter;
private final List<Transform> transforms;
@JsonCreator
public TransformSpec(
@JsonProperty("filter") final DimFilter filter,
@JsonProperty("transforms") final List<Transform> transforms
)
{
this.filter = filter;
this.transforms = transforms == null ? ImmutableList.of() : transforms;
// Check for name collisions.
final Set<String> seen = new HashSet<>();
for (Transform transform : this.transforms) {
if (!seen.add(transform.getName())) {
throw new ISE("Transform name '%s' cannot be used twice", transform.getName());
}
}
}
public static <T> TransformSpec fromInputRowParser(final InputRowParser<T> parser)
{
// Hack: some firehoses and input specs must extract transformSpec from the parser, since they do not
// actually use the parser, but still must respect the transformSpec. This method should extract whatever
// transformSpec "decorate" had put in.
if (parser instanceof TransformingInputRowParser) {
return ((TransformingInputRowParser) parser).getTransformSpec();
} else if (parser instanceof TransformingStringInputRowParser) {
return ((TransformingStringInputRowParser) parser).getTransformSpec();
} else {
throw new ISE("Parser was not decorated, but should have been");
}
}
@JsonProperty
@Nullable
public DimFilter getFilter()
{
return filter;
}
@JsonProperty
public List<Transform> getTransforms()
{
return transforms;
}
public <T> InputRowParser<T> decorate(final InputRowParser<T> parser)
{
// Always decorates, even if the transformSpec is a no-op. This is so fromInputRowParser can insist that the
// parser is a transforming parser, and possibly help detect coding errors where someone forgot to call "decorate".
if (parser instanceof StringInputRowParser) {
// Hack to support the fact that some callers use special methods in StringInputRowParser, such as
// parse(String) and startFileFromBeginning.
return (InputRowParser<T>) new TransformingStringInputRowParser(
parser.getParseSpec(),
((StringInputRowParser) parser).getEncoding(),
this
);
} else {
return new TransformingInputRowParser<>(parser, this);
}
}
public Transformer toTransformer()
{
return new Transformer(this);
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TransformSpec that = (TransformSpec) o;
return Objects.equals(filter, that.filter) &&
Objects.equals(transforms, that.transforms);
}
@Override
public int hashCode()
{
return Objects.hash(filter, transforms);
}
@Override
public String toString()
{
return "TransformSpec{" +
"filter=" + filter +
", transforms=" + transforms +
'}';
}
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.java.util.common.DateTimes;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
import io.druid.segment.column.Column;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
*
*/
public class Transformer
{
private final Map<String, RowFunction> transforms = new HashMap<>();
private final ThreadLocal<Row> rowSupplierForValueMatcher = new ThreadLocal<>();
private final ValueMatcher valueMatcher;
Transformer(final TransformSpec transformSpec)
{
for (final Transform transform : transformSpec.getTransforms()) {
transforms.put(transform.getName(), transform.getRowFunction());
}
if (transformSpec.getFilter() != null) {
valueMatcher = transformSpec.getFilter().toFilter()
.makeMatcher(
RowBasedColumnSelectorFactory.create(
rowSupplierForValueMatcher,
null
)
);
} else {
valueMatcher = null;
}
}
/**
* Transforms an input row, or returns null if the row should be filtered out.
*
* @param row the input row
*/
@Nullable
public InputRow transform(@Nullable final InputRow row)
{
if (row == null) {
return null;
}
final InputRow transformedRow;
if (transforms.isEmpty()) {
transformedRow = row;
} else {
transformedRow = new TransformedInputRow(row, transforms);
}
if (valueMatcher != null) {
rowSupplierForValueMatcher.set(transformedRow);
if (!valueMatcher.matches()) {
return null;
}
}
return transformedRow;
}
public static class TransformedInputRow implements InputRow
{
private final InputRow row;
private final Map<String, RowFunction> transforms;
public TransformedInputRow(final InputRow row, final Map<String, RowFunction> transforms)
{
this.row = row;
this.transforms = transforms;
}
@Override
public List<String> getDimensions()
{
return row.getDimensions();
}
@Override
public long getTimestampFromEpoch()
{
final RowFunction transform = transforms.get(Column.TIME_COLUMN_NAME);
if (transform != null) {
return Rows.objectToNumber(Column.TIME_COLUMN_NAME, transform.eval(row)).longValue();
} else {
return row.getTimestampFromEpoch();
}
}
@Override
public DateTime getTimestamp()
{
final RowFunction transform = transforms.get(Column.TIME_COLUMN_NAME);
if (transform != null) {
return DateTimes.utc(getTimestampFromEpoch());
} else {
return row.getTimestamp();
}
}
@Override
public List<String> getDimension(final String dimension)
{
final RowFunction transform = transforms.get(dimension);
if (transform != null) {
return Rows.objectToStrings(transform.eval(row));
} else {
return row.getDimension(dimension);
}
}
@Override
public Object getRaw(final String column)
{
final RowFunction transform = transforms.get(column);
if (transform != null) {
return transform.eval(row);
} else {
return row.getRaw(column);
}
}
@Override
public Number getMetric(final String metric)
{
final RowFunction transform = transforms.get(metric);
if (transform != null) {
return Rows.objectToNumber(metric, transform.eval(row));
} else {
return row.getMetric(metric);
}
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TransformedInputRow that = (TransformedInputRow) o;
return Objects.equals(row, that.row) &&
Objects.equals(transforms, that.transforms);
}
@Override
public int hashCode()
{
return Objects.hash(row, transforms);
}
@Override
public int compareTo(final Row o)
{
return row.compareTo(o);
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
public class TransformingInputRowParser<T> implements InputRowParser<T>
{
private final InputRowParser<T> parser;
private final TransformSpec transformSpec;
private final Transformer transformer;
public TransformingInputRowParser(final InputRowParser<T> parser, final TransformSpec transformSpec)
{
this.parser = parser;
this.transformSpec = transformSpec;
this.transformer = transformSpec.toTransformer();
}
@Override
public InputRow parse(final T row)
{
return transformer.transform(parser.parse(row));
}
@Override
public ParseSpec getParseSpec()
{
return parser.getParseSpec();
}
@Override
@SuppressWarnings("unchecked")
public InputRowParser<T> withParseSpec(final ParseSpec parseSpec)
{
return new TransformingInputRowParser<>(parser.withParseSpec(parseSpec), transformSpec);
}
public TransformSpec getTransformSpec()
{
return transformSpec;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class TransformingStringInputRowParser extends StringInputRowParser
{
private final TransformSpec transformSpec;
private final Transformer transformer;
public TransformingStringInputRowParser(
final ParseSpec parseSpec,
final String encoding,
final TransformSpec transformSpec
)
{
super(parseSpec, encoding);
this.transformSpec = transformSpec;
this.transformer = transformSpec.toTransformer();
}
@Override
public InputRow parse(final ByteBuffer input)
{
return transformer.transform(super.parse(input));
}
@Nullable
@Override
public InputRow parse(@Nullable final String input)
{
return transformer.transform(super.parse(input));
}
@Override
public StringInputRowParser withParseSpec(final ParseSpec parseSpec)
{
return new TransformingStringInputRowParser(parseSpec, getEncoding(), transformSpec);
}
public TransformSpec getTransformSpec()
{
return transformSpec;
}
}

View File

@ -29,6 +29,7 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
@ -110,6 +111,7 @@ public class CombiningFirehoseFactory implements FirehoseFactory<InputRowParser>
return currentFirehose.hasMore();
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -37,7 +37,7 @@ import io.druid.java.util.common.concurrent.Execs;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.InputRowParser;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.DateTimes;
@ -52,6 +52,7 @@ import io.druid.server.security.ResourceAction;
import io.druid.server.security.ResourceType;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
@ -81,7 +82,7 @@ import java.util.concurrent.atomic.AtomicLong;
* Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these
* firehoses with an {@link ServiceAnnouncingChatHandlerProvider}.
*/
public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRowParser>
public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowParser<Map<String, Object>>>
{
public static final int MAX_FIREHOSE_PRODUCERS = 10_000;
@ -119,7 +120,10 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
}
@Override
public Firehose connect(MapInputRowParser firehoseParser, File temporaryDirectory) throws IOException
public Firehose connect(
InputRowParser<Map<String, Object>> firehoseParser,
File temporaryDirectory
) throws IOException
{
log.info("Connecting firehose: %s", serviceName);
final EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser);
@ -155,7 +159,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
{
private final ScheduledExecutorService exec;
private final BlockingQueue<InputRow> buffer;
private final MapInputRowParser parser;
private final InputRowParser<Map<String, Object>> parser;
private final Object readLock = new Object();
@ -165,7 +169,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
private final ConcurrentMap<String, Long> producerSequences = new ConcurrentHashMap<>();
public EventReceiverFirehose(MapInputRowParser parser)
public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
{
this.buffer = new ArrayBlockingQueue<>(bufferSize);
this.parser = parser;
@ -185,7 +189,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
req,
new ResourceAction(
new Resource("STATE", ResourceType.STATE),
Action.WRITE
Action.WRITE
),
authorizerMapper
);
@ -264,6 +268,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
}
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -27,6 +27,7 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
@ -74,6 +75,7 @@ public class FixedCountFirehoseFactory implements FirehoseFactory
return i < count && delegateFirehose.hasMore();
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -43,6 +43,8 @@ import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.indexing.Transformer;
import io.druid.utils.Runnables;
import javax.annotation.Nullable;
@ -53,15 +55,19 @@ import java.util.Map;
public class IngestSegmentFirehose implements Firehose
{
private final Transformer transformer;
private Yielder<InputRow> rowYielder;
public IngestSegmentFirehose(
final List<WindowedStorageAdapter> adapters,
final TransformSpec transformSpec,
final List<String> dims,
final List<String> metrics,
final DimFilter dimFilter
)
{
this.transformer = transformSpec.toTransformer();
Sequence<InputRow> rows = Sequences.concat(
Iterables.transform(
adapters, new Function<WindowedStorageAdapter, Sequence<InputRow>>()
@ -184,12 +190,13 @@ public class IngestSegmentFirehose implements Firehose
return !rowYielder.isDone();
}
@Nullable
@Override
public InputRow nextRow()
{
final InputRow inputRow = rowYielder.get();
rowYielder = rowYielder.next(null);
return inputRow;
return transformer.transform(inputRow);
}
@Override

View File

@ -33,11 +33,13 @@ import com.ircclouds.irc.api.state.IIRCState;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
@ -61,7 +63,7 @@ import java.util.concurrent.TimeUnit;
* );
* }</pre>
*/
public class IrcFirehoseFactory implements FirehoseFactory<IrcInputRowParser>
public class IrcFirehoseFactory implements FirehoseFactory<InputRowParser<Pair<DateTime, ChannelPrivMsg>>>
{
private static final Logger log = new Logger(IrcFirehoseFactory.class);
@ -101,7 +103,10 @@ public class IrcFirehoseFactory implements FirehoseFactory<IrcInputRowParser>
}
@Override
public Firehose connect(final IrcInputRowParser firehoseParser, File temporaryDirectory) throws IOException
public Firehose connect(
final InputRowParser<Pair<DateTime, ChannelPrivMsg>> firehoseParser,
final File temporaryDirectory
) throws IOException
{
final IRCApi irc = new IRCApiImpl(false);
final LinkedBlockingQueue<Pair<DateTime, ChannelPrivMsg>> queue = new LinkedBlockingQueue<Pair<DateTime, ChannelPrivMsg>>();
@ -212,6 +217,7 @@ public class IrcFirehoseFactory implements FirehoseFactory<IrcInputRowParser>
}
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -25,6 +25,7 @@ import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
/**
@ -71,6 +72,7 @@ public class PredicateFirehose implements Firehose
return false;
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -29,6 +29,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
@ -102,6 +103,7 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
return firehose.hasMore();
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -20,7 +20,6 @@
package io.druid.segment.realtime.plumber;
import com.google.common.base.Supplier;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
@ -62,13 +61,9 @@ public class Plumbers
}
if (inputRow == null) {
if (reportParseExceptions) {
throw new ParseException("null input row");
} else {
log.debug("Discarded null input row, considering unparseable.");
metrics.incrementUnparseable();
return;
}
log.debug("Discarded null row, considering thrownAway.");
metrics.incrementThrownAway();
return;
}
final int numRows;

View File

@ -33,6 +33,7 @@ import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@ -132,6 +133,7 @@ public class CombiningFirehoseFactoryTest
return iterator.hasNext();
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -21,12 +21,15 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.granularity.DurationGranularity;
@ -34,6 +37,8 @@ import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.expression.TestExprMacroTable;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.TestHelper;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.hamcrest.CoreMatchers;
@ -42,6 +47,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
@ -75,6 +81,7 @@ public class DataSchemaTest
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
null,
jsonMapper
);
@ -111,6 +118,7 @@ public class DataSchemaTest
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
null,
jsonMapper
);
@ -120,6 +128,63 @@ public class DataSchemaTest
);
}
@Test
public void testTransformSpec() throws Exception
{
Map<String, Object> parserMap = jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "dimA", "dimB", "col2")),
ImmutableList.of(),
null
),
null,
null
),
null
), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
DataSchema schema = new DataSchema(
"test",
parserMap,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
new TransformSpec(
new SelectorDimFilter("dimA", "foo", null),
ImmutableList.of(
new ExpressionTransform("expr", "concat(dimA,dimA)", TestExprMacroTable.INSTANCE)
)
),
jsonMapper
);
// Test hack that produces a StringInputRowParser.
final StringInputRowParser parser = (StringInputRowParser) schema.getParser();
final InputRow row1bb = parser.parse(
ByteBuffer.wrap("{\"time\":\"2000-01-01\",\"dimA\":\"foo\"}".getBytes(Charsets.UTF_8))
);
Assert.assertEquals(DateTimes.of("2000-01-01"), row1bb.getTimestamp());
Assert.assertEquals("foo", row1bb.getRaw("dimA"));
Assert.assertEquals("foofoo", row1bb.getRaw("expr"));
final InputRow row1string = parser.parse("{\"time\":\"2000-01-01\",\"dimA\":\"foo\"}");
Assert.assertEquals(DateTimes.of("2000-01-01"), row1string.getTimestamp());
Assert.assertEquals("foo", row1string.getRaw("dimA"));
Assert.assertEquals("foofoo", row1string.getRaw("expr"));
final InputRow row2 = parser.parse(
ByteBuffer.wrap("{\"time\":\"2000-01-01\",\"dimA\":\"x\"}".getBytes(Charsets.UTF_8))
);
Assert.assertNull(row2);
}
@Test(expected = IAE.class)
public void testOverlapMetricNameAndDim() throws Exception
{
@ -148,6 +213,7 @@ public class DataSchemaTest
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
null,
jsonMapper
);
schema.getParser();
@ -181,6 +247,7 @@ public class DataSchemaTest
new DoubleSumAggregatorFactory("metric1", "col3"),
},
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
null,
jsonMapper
);
schema.getParser();
@ -255,7 +322,7 @@ public class DataSchemaTest
null
)
);
Assert.assertEquals(
Assert.assertArrayEquals(
actual.getAggregators(),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1")

View File

@ -0,0 +1,203 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.DateTimes;
import io.druid.query.expression.TestExprMacroTable;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class TransformSpecTest
{
private static final MapInputRowParser PARSER = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("t", "auto", DateTimes.of("2000-01-01")),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("f", "x", "y")),
null,
null
)
)
);
private static final Map<String, Object> ROW1 = ImmutableMap.<String, Object>builder()
.put("x", "foo")
.put("y", "bar")
.put("a", 2.0)
.put("b", 3L)
.build();
private static final Map<String, Object> ROW2 = ImmutableMap.<String, Object>builder()
.put("x", "foo")
.put("y", "baz")
.put("a", 2.0)
.put("b", 4L)
.build();
@Test
public void testTransforms()
{
final TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(
new ExpressionTransform("f", "concat(x,y)", TestExprMacroTable.INSTANCE),
new ExpressionTransform("g", "a + b", TestExprMacroTable.INSTANCE),
new ExpressionTransform("h", "concat(f,g)", TestExprMacroTable.INSTANCE)
)
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parse(ROW1);
Assert.assertNotNull(row);
Assert.assertEquals(DateTimes.of("2000-01-01").getMillis(), row.getTimestampFromEpoch());
Assert.assertEquals(DateTimes.of("2000-01-01"), row.getTimestamp());
Assert.assertEquals(ImmutableList.of("f", "x", "y"), row.getDimensions());
Assert.assertEquals(ImmutableList.of("foo"), row.getDimension("x"));
Assert.assertEquals(3.0, row.getMetric("b").doubleValue(), 0);
Assert.assertEquals("foobar", row.getRaw("f"));
Assert.assertEquals(ImmutableList.of("foobar"), row.getDimension("f"));
Assert.assertEquals(ImmutableList.of("5.0"), row.getDimension("g"));
Assert.assertEquals(ImmutableList.of(), row.getDimension("h"));
Assert.assertEquals(5L, row.getMetric("g").longValue());
}
@Test
public void testTransformOverwriteField()
{
// Transforms are allowed to overwrite fields, and to refer to the fields they overwrite; double-check this.
final TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(
new ExpressionTransform("x", "concat(x,y)", TestExprMacroTable.INSTANCE)
)
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parse(ROW1);
Assert.assertNotNull(row);
Assert.assertEquals(DateTimes.of("2000-01-01").getMillis(), row.getTimestampFromEpoch());
Assert.assertEquals(DateTimes.of("2000-01-01"), row.getTimestamp());
Assert.assertEquals(ImmutableList.of("f", "x", "y"), row.getDimensions());
Assert.assertEquals(ImmutableList.of("foobar"), row.getDimension("x"));
Assert.assertEquals(3.0, row.getMetric("b").doubleValue(), 0);
Assert.assertNull(row.getRaw("f"));
}
@Test
public void testFilterOnTransforms()
{
// Filters are allowed to refer to transformed fields; double-check this.
final TransformSpec transformSpec = new TransformSpec(
new AndDimFilter(
ImmutableList.of(
new SelectorDimFilter("x", "foo", null),
new SelectorDimFilter("f", "foobar", null),
new SelectorDimFilter("g", "5.0", null)
)
),
ImmutableList.of(
new ExpressionTransform("f", "concat(x,y)", TestExprMacroTable.INSTANCE),
new ExpressionTransform("g", "a + b", TestExprMacroTable.INSTANCE)
)
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
Assert.assertNotNull(parser.parse(ROW1));
Assert.assertNull(parser.parse(ROW2));
}
@Test
public void testTransformTimeFromOtherFields()
{
final TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(
new ExpressionTransform("__time", "(a + b) * 3600000", TestExprMacroTable.INSTANCE)
)
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parse(ROW1);
Assert.assertNotNull(row);
Assert.assertEquals(DateTimes.of("1970-01-01T05:00:00Z"), row.getTimestamp());
Assert.assertEquals(DateTimes.of("1970-01-01T05:00:00Z").getMillis(), row.getTimestampFromEpoch());
}
@Test
public void testTransformTimeFromTime()
{
final TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(
new ExpressionTransform("__time", "__time + 3600000", TestExprMacroTable.INSTANCE)
)
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parse(ROW1);
Assert.assertNotNull(row);
Assert.assertEquals(DateTimes.of("2000-01-01T01:00:00Z"), row.getTimestamp());
Assert.assertEquals(DateTimes.of("2000-01-01T01:00:00Z").getMillis(), row.getTimestampFromEpoch());
}
@Test
public void testSerde() throws Exception
{
final TransformSpec transformSpec = new TransformSpec(
new AndDimFilter(
ImmutableList.of(
new SelectorDimFilter("x", "foo", null),
new SelectorDimFilter("f", "foobar", null),
new SelectorDimFilter("g", "5.0", null)
)
),
ImmutableList.of(
new ExpressionTransform("f", "concat(x,y)", TestExprMacroTable.INSTANCE),
new ExpressionTransform("g", "a + b", TestExprMacroTable.INSTANCE)
)
);
final ObjectMapper jsonMapper = TestHelper.getJsonMapper();
Assert.assertEquals(
transformSpec,
jsonMapper.readValue(jsonMapper.writeValueAsString(transformSpec), TransformSpec.class)
);
}
}

View File

@ -97,6 +97,7 @@ public class FireDepartmentTest
new CountAggregatorFactory("count")
},
new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, null),
null,
jsonMapper
),
new RealtimeIOConfig(

View File

@ -145,6 +145,7 @@ public class RealtimeManagerTest
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null),
null,
jsonMapper
);
schema2 = new DataSchema(
@ -152,6 +153,7 @@ public class RealtimeManagerTest
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null),
null,
jsonMapper
);
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
@ -295,6 +297,7 @@ public class RealtimeManagerTest
null,
new AggregatorFactory[]{new CountAggregatorFactory("ignore")},
new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null),
null,
jsonMapper
);
@ -331,8 +334,8 @@ public class RealtimeManagerTest
}
Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway());
Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable());
Assert.assertEquals(2, realtimeManager.getMetrics("test").thrownAway());
Assert.assertEquals(1, realtimeManager.getMetrics("test").unparseable());
Assert.assertTrue(plumber.isStartedJob());
Assert.assertTrue(plumber.isFinishedJob());
Assert.assertEquals(0, plumber.getPersistCount());
@ -859,6 +862,7 @@ public class RealtimeManagerTest
return rows.hasNext();
}
@Nullable
@Override
public InputRow nextRow()
{

View File

@ -21,20 +21,17 @@ package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
//CHECKSTYLE.OFF: Regexp
import com.metamx.common.logger.Logger;
//CHECKSTYLE.ON: Regexp
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.LoggingEmitter;
import com.metamx.emitter.core.NoopEmitter;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
@ -130,6 +127,7 @@ public class AppenderatorTester implements AutoCloseable
new LongSumAggregatorFactory("met", "met")
},
new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
null,
objectMapper
);
@ -170,11 +168,7 @@ public class AppenderatorTester implements AutoCloseable
emitter = new ServiceEmitter(
"test",
"test",
new LoggingEmitter(
new Logger(AppenderatorTester.class),
LoggingEmitter.Level.INFO,
objectMapper
)
new NoopEmitter()
);
emitter.start();
EmittingLogger.registerEmitter(emitter);

View File

@ -128,6 +128,7 @@ public class DefaultOfflineAppenderatorFactoryTest
new LongSumAggregatorFactory("met", "met")
},
new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
null,
objectMapper
);

View File

@ -47,6 +47,7 @@ import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.indexing.TransformSpec;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -117,6 +118,7 @@ public class IngestSegmentFirehoseTest
final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
final IngestSegmentFirehose firehose = new IngestSegmentFirehose(
ImmutableList.of(wsa, wsa),
TransformSpec.NONE,
ImmutableList.of("host", "spatial"),
ImmutableList.of("visited_sum", "unique_hosts"),
null
@ -149,6 +151,7 @@ public class IngestSegmentFirehoseTest
// Do a spatial filter
final IngestSegmentFirehose firehose2 = new IngestSegmentFirehose(
ImmutableList.of(new WindowedStorageAdapter(queryable, Intervals.of("2000/3000"))),
TransformSpec.NONE,
ImmutableList.of("host", "spatial"),
ImmutableList.of("visited_sum", "unique_hosts"),
new SpatialDimFilter("spatial", new RadiusBound(new float[]{1, 0}, 0.1f))

View File

@ -142,6 +142,7 @@ public class RealtimePlumberSchoolTest
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null),
null,
jsonMapper
);
@ -161,6 +162,7 @@ public class RealtimePlumberSchoolTest
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.YEAR, Granularities.NONE, null),
null,
jsonMapper
);

View File

@ -53,6 +53,7 @@ public class SinkTest
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, null),
null,
new DefaultObjectMapper()
);

View File

@ -150,6 +150,7 @@ public class DruidJsonValidatorTest
null,
new AggregatorFactory[0],
new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null),
null,
jsonMapper
),
new RealtimeIOConfig(