Parse Batch support (#5081)

* add parseBatch and deprecate parse method in InputRowParser

add addAll method, skip max rows in memory check for it

remove parse method from implemetations

transform transformers

add string multiplier input row parser

fix withParseSpec

fix kafka batch indexing

fix isPersistRequired

comments

* add unit test

* make persist async

* review comments
This commit is contained in:
Parag Jain 2017-12-04 16:06:16 -06:00 committed by Himanshu
parent c3bb03dcec
commit 7c01f77b04
95 changed files with 814 additions and 461 deletions

View File

@ -23,8 +23,11 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.data.input.InputRow;
import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.collect.Utils;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.List;
@ExtensionPoint
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@ -35,12 +38,27 @@ import javax.annotation.Nullable;
})
public interface InputRowParser<T>
{
/**
* Parse an input into list of {@link InputRow}. List can contains null for rows that should be thrown away,
* or throws {@code ParseException} if the input is unparseable. This method should never return null otherwise
* lots of things will break.
*/
@NotNull
default List<InputRow> parseBatch(T input)
{
return Utils.nullableListOf(parse(input));
}
/**
* Parse an input into an {@link InputRow}. Return null if this input should be thrown away, or throws
* {@code ParseException} if the input is unparseable.
*/
@Deprecated
@Nullable
InputRow parse(T input);
default InputRow parse(T input)
{
return null;
}
ParseSpec getParseSpec();

View File

@ -21,6 +21,7 @@ package io.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.data.input.InputRow;
@ -45,7 +46,7 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
}
@Override
public InputRow parse(Map<String, Object> theMap)
public List<InputRow> parseBatch(Map<String, Object> theMap)
{
final List<String> dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions()
? parseSpec.getDimensionsSpec().getDimensionNames()
@ -74,7 +75,7 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
throw new ParseException(e, "Unparseable timestamp found!");
}
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
return ImmutableList.of(new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap));
}
@JsonProperty

View File

@ -21,8 +21,11 @@ package io.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.InputRow;
import java.util.List;
/**
*/
public class NoopInputRowParser implements InputRowParser<InputRow>
@ -38,9 +41,9 @@ public class NoopInputRowParser implements InputRowParser<InputRow>
}
@Override
public InputRow parse(InputRow input)
public List<InputRow> parseBatch(InputRow input)
{
return input;
return ImmutableList.of(input);
}
@JsonProperty

View File

@ -23,8 +23,10 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.collect.Utils;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.common.parsers.Parser;
@ -34,6 +36,7 @@ import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Map;
/**
@ -72,9 +75,9 @@ public class StringInputRowParser implements ByteBufferInputRowParser
}
@Override
public InputRow parse(ByteBuffer input)
public List<InputRow> parseBatch(ByteBuffer input)
{
return parseMap(buildStringKeyMap(input));
return Utils.nullableListOf(parseMap(buildStringKeyMap(input)));
}
@JsonProperty
@ -149,7 +152,7 @@ public class StringInputRowParser implements ByteBufferInputRowParser
private Map<String, Object> parseString(@Nullable String inputString)
{
initializeParser();
return parser.parse(inputString);
return parser.parseToMap(inputString);
}
@Nullable
@ -159,6 +162,6 @@ public class StringInputRowParser implements ByteBufferInputRowParser
if (theMap == null) {
return null;
}
return mapParser.parse(theMap);
return Iterators.getOnlyElement(mapParser.parseBatch(theMap).iterator());
}
}

View File

@ -48,7 +48,7 @@ public class TimeAndDimsParseSpec extends ParseSpec
return new Parser<String, Object>()
{
@Override
public Map<String, Object> parse(String input)
public Map<String, Object> parseToMap(String input)
{
throw new UnsupportedOperationException("not supported");
}

View File

@ -60,9 +60,9 @@ public class InputRowParserSerdeTest
jsonMapper.writeValueAsBytes(parser),
ByteBufferInputRowParser.class
);
final InputRow parsed = parser2.parse(
final InputRow parsed = parser2.parseBatch(
ByteBuffer.wrap(StringUtils.toUtf8("{\"foo\":\"x\",\"bar\":\"y\",\"qux\":\"z\",\"timestamp\":\"2000\"}"))
);
).get(0);
Assert.assertEquals(ImmutableList.of("foo", "bar"), parsed.getDimensions());
Assert.assertEquals(ImmutableList.of("x"), parsed.getDimension("foo"));
Assert.assertEquals(ImmutableList.of("y"), parsed.getDimension("bar"));
@ -101,14 +101,14 @@ public class InputRowParserSerdeTest
jsonMapper.writeValueAsBytes(parser),
MapInputRowParser.class
);
final InputRow parsed = parser2.parse(
final InputRow parsed = parser2.parseBatch(
ImmutableMap.<String, Object>of(
"foo", "x",
"bar", "y",
"qux", "z",
"timeposix", "1"
)
);
).get(0);
Assert.assertEquals(ImmutableList.of("foo", "bar"), parsed.getDimensions());
Assert.assertEquals(ImmutableList.of("x"), parsed.getDimension("foo"));
Assert.assertEquals(ImmutableList.of("y"), parsed.getDimension("bar"));
@ -130,7 +130,7 @@ public class InputRowParserSerdeTest
jsonMapper.writeValueAsBytes(parser),
MapInputRowParser.class
);
final InputRow parsed = parser2.parse(
final InputRow parsed = parser2.parseBatch(
ImmutableMap.<String, Object>of(
"timemillis", 1412705931123L,
"toobig", 123E64,
@ -138,7 +138,7 @@ public class InputRowParserSerdeTest
"long", 123456789000L,
"values", Lists.newArrayList(1412705931123L, 123.456, 123E45, "hello")
)
);
).get(0);
Assert.assertEquals(ImmutableList.of("foo", "values"), parsed.getDimensions());
Assert.assertEquals(ImmutableList.of(), parsed.getDimension("foo"));
Assert.assertEquals(
@ -170,11 +170,11 @@ public class InputRowParserSerdeTest
ByteBufferInputRowParser.class
);
final InputRow parsed = parser2.parse(
final InputRow parsed = parser2.parseBatch(
ByteBuffer.wrap(
"{\"foo\":\"x\",\"bar\":\"y\",\"qux\":\"z\",\"timestamp\":\"3000\"}".getBytes(charset)
)
);
).get(0);
return parsed;
}

View File

@ -45,7 +45,7 @@ public class JSONLowercaseParseSpecTest
)
);
Parser parser = spec.makeParser();
Map<String, Object> event = parser.parse("{\"timestamp\":\"2015-01-01\",\"A\":\"foo\"}");
Map<String, Object> event = parser.parseToMap("{\"timestamp\":\"2015-01-01\",\"A\":\"foo\"}");
Assert.assertEquals("foo", event.get("a"));
}
}

View File

@ -69,7 +69,7 @@ public class JSONParseSpecTest
expected.put("jq_omg2", null);
final Parser<String, Object> parser = parseSpec.makeParser();
final Map<String, Object> parsedRow = parser.parse("{\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}");
final Map<String, Object> parsedRow = parser.parseToMap("{\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}");
Assert.assertNotNull(parsedRow);
Assert.assertEquals(expected, parsedRow);
Assert.assertNull(parsedRow.get("bar"));

View File

@ -81,7 +81,7 @@ public class JavaScriptParseSpecTest
);
final Parser<String, Object> parser = spec.makeParser();
final Map<String, Object> obj = parser.parse("x-y");
final Map<String, Object> obj = parser.parseToMap("x-y");
Assert.assertEquals(ImmutableMap.of("one", "x", "two", "y"), obj);
}

View File

@ -91,7 +91,7 @@ public class FlattenJSONBenchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public Map<String, Object> baseline(final Blackhole blackhole)
{
Map<String, Object> parsed = flatParser.parse(flatInputs.get(flatCounter));
Map<String, Object> parsed = flatParser.parseToMap(flatInputs.get(flatCounter));
for (String s : parsed.keySet()) {
blackhole.consume(parsed.get(s));
}
@ -104,7 +104,7 @@ public class FlattenJSONBenchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public Map<String, Object> flatten(final Blackhole blackhole)
{
Map<String, Object> parsed = nestedParser.parse(nestedInputs.get(nestedCounter));
Map<String, Object> parsed = nestedParser.parseToMap(nestedInputs.get(nestedCounter));
for (String s : parsed.keySet()) {
blackhole.consume(parsed.get(s));
}
@ -117,7 +117,7 @@ public class FlattenJSONBenchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public Map<String, Object> jqflatten(final Blackhole blackhole)
{
Map<String, Object> parsed = jqParser.parse(jqInputs.get(jqCounter));
Map<String, Object> parsed = jqParser.parseToMap(jqInputs.get(jqCounter));
for (String s : parsed.keySet()) {
blackhole.consume(parsed.get(s));
}
@ -130,7 +130,7 @@ public class FlattenJSONBenchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public Map<String, Object> preflattenNestedParser(final Blackhole blackhole)
{
Map<String, Object> parsed = fieldDiscoveryParser.parse(flatInputs.get(nestedCounter));
Map<String, Object> parsed = fieldDiscoveryParser.parseToMap(flatInputs.get(nestedCounter));
for (String s : parsed.keySet()) {
blackhole.consume(parsed.get(s));
}
@ -143,7 +143,7 @@ public class FlattenJSONBenchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public Map<String, Object> forcedRootPaths(final Blackhole blackhole)
{
Map<String, Object> parsed = forcedPathParser.parse(flatInputs.get(nestedCounter));
Map<String, Object> parsed = forcedPathParser.parseToMap(flatInputs.get(nestedCounter));
for (String s : parsed.keySet()) {
blackhole.consume(parsed.get(s));
}

View File

@ -40,9 +40,9 @@ public class FlattenJSONBenchmarkUtilTest
Parser nestedParser = eventGen.getNestedParser();
Parser jqParser = eventGen.getJqParser();
Map<String, Object> event = flatParser.parse(newEvent);
Map<String, Object> event2 = nestedParser.parse(newEvent2);
Map<String, Object> event3 = jqParser.parse(newEvent2); // reuse the same event as "nested"
Map<String, Object> event = flatParser.parseToMap(newEvent);
Map<String, Object> event2 = nestedParser.parseToMap(newEvent2);
Map<String, Object> event3 = jqParser.parseToMap(newEvent2); // reuse the same event as "nested"
checkEvent1(event);
checkEvent2(event2);

View File

@ -33,6 +33,7 @@ import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -198,10 +199,14 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<InputRowParser<B
return new Firehose()
{
private Iterator<InputRow> nextIterator = Iterators.emptyIterator();
@Override
public boolean hasMore()
{
if (nextIterator.hasNext()) {
return true;
}
boolean hasMore = false;
DruidPullRequest earliestPullRequest = null;
@ -252,15 +257,19 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<InputRowParser<B
@Override
public InputRow nextRow()
{
if (nextIterator.hasNext()) {
return nextIterator.next();
}
for (Map.Entry<MessageQueue, ConcurrentSkipListSet<MessageExt>> entry : messageQueueTreeSetMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
MessageExt message = entry.getValue().pollFirst();
InputRow inputRow = theParser.parse(ByteBuffer.wrap(message.getBody()));
nextIterator = theParser.parseBatch(ByteBuffer.wrap(message.getBody())).iterator();
windows
.computeIfAbsent(entry.getKey(), k -> new ConcurrentSkipListSet<>())
.add(message.getQueueOffset());
return inputRow;
return nextIterator.next();
}
}

View File

@ -22,6 +22,7 @@ package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.common.parsers.ParseException;
@ -37,6 +38,7 @@ import io.druid.java.util.common.StringUtils;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@ -175,6 +177,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
private volatile boolean stopped;
private volatile BytesMessageWithOffset msg = null;
private volatile InputRow row = null;
private volatile Iterator<InputRow> nextIterator = Iterators.emptyIterator();
{
lastOffsetPartitions = Maps.newHashMap();
@ -202,14 +205,18 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
try {
row = null;
while (row == null) {
if (!nextIterator.hasNext()) {
if (msg != null) {
lastOffsetPartitions.put(msg.getPartition(), msg.offset());
}
msg = messageQueue.take();
final byte[] message = msg.message();
row = message == null ? null : firehoseParser.parse(ByteBuffer.wrap(message));
nextIterator = message == null
? Iterators.emptyIterator()
: firehoseParser.parseBatch(ByteBuffer.wrap(message)).iterator();
continue;
}
row = nextIterator.next();
}
}
catch (InterruptedException e) {

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.data.input.InputRow;
@ -74,7 +75,7 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
@SuppressWarnings("ArgumentParameterSwap")
@Override
public InputRow parse(OrcStruct input)
public List<InputRow> parseBatch(OrcStruct input)
{
Map<String, Object> map = Maps.newHashMap();
List<? extends StructField> fields = oip.getAllStructFieldRefs();
@ -106,7 +107,7 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
DateTime dateTime = timestampSpec.extractTimestamp(map);
return new MapBasedInputRow(dateTime, dimensions, map);
return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, map));
}
private List getListObject(ListObjectInspector listObjectInspector, Object listObject)

View File

@ -99,7 +99,7 @@ public class DruidOrcInputFormatTest
OrcStruct data = (OrcStruct) reader.getCurrentValue();
MapBasedInputRow row = (MapBasedInputRow) parser.parse(data);
MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0);
Assert.assertTrue(row.getEvent().keySet().size() == 4);
Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp());

View File

@ -172,7 +172,7 @@ public class OrcHadoopInputRowParserTest
);
oi.setStructFieldData(struct, oi.getStructFieldRef("col6"), null);
final InputRow row = parser.parse(struct);
final InputRow row = parser.parseBatch(struct).get(0);
Assert.assertEquals("timestamp", DateTimes.of("2000-01-01"), row.getTimestamp());
Assert.assertEquals("col1", "foo", row.getRaw("col1"));
Assert.assertEquals("col2", ImmutableList.of("foo", "bar"), row.getRaw("col2"));

View File

@ -20,6 +20,7 @@ package io.druid.data.input.parquet;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
@ -77,7 +78,7 @@ public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord
* imitate avro extension {@link io.druid.data.input.avro.AvroParsers#parseGenericRecord(GenericRecord, ParseSpec, ObjectFlattener)}
*/
@Override
public InputRow parse(GenericRecord record)
public List<InputRow> parseBatch(GenericRecord record)
{
// Map the record to a map
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, binaryAsString);
@ -97,7 +98,7 @@ public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord
dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
}
return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap);
return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap));
}
@JsonProperty

View File

@ -57,7 +57,10 @@ public class DruidParquetInputTest
// field not read, should return null
assertEquals(data.get("added"), null);
assertEquals(data.get("page"), new Utf8("Gypsy Danger"));
assertEquals(config.getParser().parse(data).getDimension("page").get(0), "Gypsy Danger");
assertEquals(
((List<InputRow>) config.getParser().parseBatch(data)).get(0).getDimension("page").get(0),
"Gypsy Danger"
);
}
@Test
@ -70,7 +73,7 @@ public class DruidParquetInputTest
config.intoConfiguration(job);
GenericRecord data = getFirstRecord(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
InputRow row = config.getParser().parse(data);
InputRow row = ((List<InputRow>) config.getParser().parseBatch(data)).get(0);
// without binaryAsString: true, the value would something like "[104, 101, 121, 32, 116, 104, 105, 115, 32, 105, 115, 3.... ]"
assertEquals(row.getDimension("field").get(0), "hey this is &é(-è_çà)=^$ù*! Ω^^");
@ -133,7 +136,7 @@ public class DruidParquetInputTest
while (reader.nextKeyValue()) {
reader.nextKeyValue();
GenericRecord data = (GenericRecord) reader.getCurrentValue();
records.add(parser.parse(data));
records.add(((List<InputRow>) parser.parseBatch(data)).get(0));
}
return records;

View File

@ -21,6 +21,7 @@ package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@ -45,6 +46,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -117,7 +119,9 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<InputRowParser<B
) throws Exception
{
this.connectionFactory = connectionFactory == null
? connectionFactoryCOMPAT == null ? JacksonifiedConnectionFactory.makeDefaultConnectionFactory() : connectionFactoryCOMPAT
? connectionFactoryCOMPAT == null
? JacksonifiedConnectionFactory.makeDefaultConnectionFactory()
: connectionFactoryCOMPAT
: connectionFactory;
this.config = config == null ? RabbitMQFirehoseConfig.makeDefaultConfig() : config;
@ -190,10 +194,10 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<InputRowParser<B
return new Firehose()
{
/**
* Storing the latest delivery as a member variable should be safe since this will only be run
* Storing the latest row as a member variable should be safe since this will only be run
* by a single thread.
*/
private Delivery delivery;
private InputRow nextRow;
/**
* Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to
@ -201,19 +205,29 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<InputRowParser<B
*/
private long lastDeliveryTag;
private Iterator<InputRow> nextIterator = Iterators.emptyIterator();
@Override
public boolean hasMore()
{
delivery = null;
nextRow = null;
try {
if (nextIterator.hasNext()) {
nextRow = nextIterator.next();
return true;
}
// Wait for the next delivery. This will block until something is available.
delivery = consumer.nextDelivery();
final Delivery delivery = consumer.nextDelivery();
if (delivery != null) {
lastDeliveryTag = delivery.getEnvelope().getDeliveryTag();
nextIterator = firehoseParser.parseBatch(ByteBuffer.wrap(delivery.getBody())).iterator();
if (nextIterator.hasNext()) {
nextRow = nextIterator.next();
// If delivery is non-null, we report that there is something more to process.
return true;
}
}
}
catch (InterruptedException e) {
// A little unclear on how we should handle this.
@ -230,13 +244,13 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<InputRowParser<B
@Override
public InputRow nextRow()
{
if (delivery == null) {
if (nextRow == null) {
//Just making sure.
log.wtf("I have nothing in delivery. Method hasMore() should have returned false.");
return null;
}
return firehoseParser.parse(ByteBuffer.wrap(delivery.getBody()));
return nextRow;
}
@Override

View File

@ -22,12 +22,14 @@ package io.druid.data.input.thrift;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.metamx.common.IAE;
import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.java.util.common.parsers.Parser;
import org.apache.hadoop.io.BytesWritable;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
@ -37,10 +39,9 @@ import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import io.druid.java.util.common.parsers.Parser;
/**
* 1. load thrift class from classpath or provided jar
* 2. deserialize content bytes and serialize to json
@ -89,7 +90,7 @@ public class ThriftInputRowParser implements InputRowParser<Object>
@Override
public InputRow parse(Object input)
public List<InputRow> parseBatch(Object input)
{
if (parser == null) {
// parser should be created when it is really used to avoid unnecessary initialization of the underlying
@ -137,13 +138,13 @@ public class ThriftInputRowParser implements InputRowParser<Object>
throw new IAE("some thing wrong with your thrift?");
}
Map<String, Object> record = parser.parse(json);
Map<String, Object> record = parser.parseToMap(json);
return new MapBasedInputRow(
return ImmutableList.of(new MapBasedInputRow(
parseSpec.getTimestampSpec().extractTimestamp(record),
parseSpec.getDimensionsSpec().getDimensionNames(),
record
);
));
}
@Override

View File

@ -146,17 +146,17 @@ public class ThriftInputRowParserTest
expectedException.expect(CoreMatchers.instanceOf(IllegalStateException.class));
expectedException.expectMessage("JavaScript is disabled");
parser.parse(ByteBuffer.allocate(1));
parser.parseBatch(ByteBuffer.allocate(1)).get(0);
}
public void serializationAndTest(ThriftInputRowParser parser, byte[] bytes) throws TException
{
ByteBuffer buffer = ByteBuffer.wrap(bytes);
InputRow row1 = parser.parse(buffer);
InputRow row1 = parser.parseBatch(buffer).get(0);
assertTrue(row1.getDimension("title").get(0).equals("title"));
InputRow row2 = parser.parse(new BytesWritable(bytes));
InputRow row2 = parser.parseBatch(new BytesWritable(bytes)).get(0);
assertTrue(row2.getDimension("lastName").get(0).equals("last"));
}
}

View File

@ -26,6 +26,8 @@ import io.druid.data.input.impl.ParseSpec;
import io.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.avro.generic.GenericRecord;
import java.util.List;
public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
{
private final ParseSpec parseSpec;
@ -44,7 +46,7 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
}
@Override
public InputRow parse(GenericRecord record)
public List<InputRow> parseBatch(GenericRecord record)
{
return AvroParsers.parseGenericRecord(record, parseSpec, avroFlattener);
}

View File

@ -28,6 +28,7 @@ import io.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.avro.generic.GenericRecord;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
public class AvroStreamInputRowParser implements ByteBufferInputRowParser
@ -48,7 +49,7 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
}
@Override
public InputRow parse(ByteBuffer input)
public List<InputRow> parseBatch(ByteBuffer input)
{
return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, avroFlattener);
}

View File

@ -27,6 +27,8 @@ import io.druid.java.util.common.parsers.ObjectFlattener;
import io.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.avro.generic.GenericRecord;
import java.util.List;
public class AvroParsers
{
private AvroParsers()
@ -50,12 +52,12 @@ public class AvroParsers
return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString));
}
public static InputRow parseGenericRecord(
public static List<InputRow> parseGenericRecord(
GenericRecord record,
ParseSpec parseSpec,
ObjectFlattener<GenericRecord> avroFlattener
)
{
return new MapInputRowParser(parseSpec).parse(avroFlattener.flatten(record));
return new MapInputRowParser(parseSpec).parseBatch(avroFlattener.flatten(record));
}
}

View File

@ -82,7 +82,7 @@ public class AvroHadoopInputRowParserTest
jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class
);
InputRow inputRow = parser2.parse(record);
InputRow inputRow = parser2.parseBatch(record).get(0);
assertInputRowCorrect(inputRow, DIMENSIONS);
}

View File

@ -214,7 +214,7 @@ public class AvroStreamInputRowParserTest
// write avro datum to bytes
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
InputRow inputRow = parser2.parse(ByteBuffer.wrap(out.toByteArray()));
InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
assertInputRowCorrect(inputRow, DIMENSIONS);
}
@ -255,7 +255,7 @@ public class AvroStreamInputRowParserTest
// write avro datum to bytes
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
InputRow inputRow = parser2.parse(ByteBuffer.wrap(out.toByteArray()));
InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS);
}

View File

@ -23,6 +23,7 @@ package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -40,6 +41,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -106,10 +108,12 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<InputRowParser
return new Firehose()
{
Iterator<InputRow> nextIterator = Iterators.emptyIterator();
@Override
public boolean hasMore()
{
return iter.hasNext();
return nextIterator.hasNext() || iter.hasNext();
}
@Nullable
@ -117,13 +121,16 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<InputRowParser
public InputRow nextRow()
{
try {
if (!nextIterator.hasNext()) {
final byte[] message = iter.next().message();
if (message == null) {
return null;
}
nextIterator = theParser.parseBatch(ByteBuffer.wrap(message)).iterator();
}
return theParser.parse(ByteBuffer.wrap(message));
return nextIterator.next();
}
catch (InvalidMessageException e) {

View File

@ -39,6 +39,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -65,6 +66,7 @@ import io.druid.indexing.kafka.supervisor.KafkaSupervisor;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.collect.Utils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.parsers.ParseException;
@ -82,6 +84,7 @@ import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriver;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
@ -103,6 +106,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
@ -119,6 +123,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -660,8 +665,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
try {
final byte[] valueBytes = record.value();
final InputRow row = valueBytes == null ? null : parser.parse(ByteBuffer.wrap(valueBytes));
final List<InputRow> rows = valueBytes == null
? Utils.nullableListOf((InputRow) null)
: parser.parseBatch(ByteBuffer.wrap(valueBytes));
boolean isPersistRequired = false;
for (InputRow row : rows) {
if (row != null && withinMinMaxRecordTime(row)) {
SequenceMetadata sequenceToUse = null;
for (SequenceMetadata sequence : sequences) {
@ -683,7 +692,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
row,
sequenceToUse.getSequenceName(),
committerSupplier,
true
// skip segment lineage check as there will always be one segment
// for combination of sequence and segment granularity.
// It is necessary to skip it as the task puts messages polled from all the
@ -691,6 +699,10 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
// messages among replica tasks across assigned partitions is not guaranteed
// which may cause replica tasks to ask for segments with different interval
// in different order which might cause SegmentAllocateAction to fail.
true,
// do not allow incremental persists to happen until all the rows from this batch
// of rows are indexed
false
);
if (addResult.isOk()) {
@ -701,6 +713,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
sequenceToCheckpoint = sequenceToUse;
}
}
isPersistRequired |= addResult.isPersistRequired();
} else {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
@ -713,6 +726,28 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
fireDepartmentMetrics.incrementThrownAway();
}
}
if (isPersistRequired) {
Futures.addCallback(
driver.persistAsync(committerSupplier.get()),
new FutureCallback<Object>()
{
@Override
public void onSuccess(@Nullable Object result)
{
log.info("Persist completed with metadata [%s]", result);
}
@Override
public void onFailure(Throwable t)
{
log.error("Persist failed, dying");
throwableAtomicReference.set(t);
}
}
);
}
}
catch (ParseException e) {
if (tuningConfig.isReportParseExceptions()) {
throw e;
@ -1045,34 +1080,50 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
try {
final byte[] valueBytes = record.value();
final InputRow row = valueBytes == null ? null : parser.parse(ByteBuffer.wrap(valueBytes));
final List<InputRow> rows = valueBytes == null
? Utils.nullableListOf((InputRow) null)
: parser.parseBatch(ByteBuffer.wrap(valueBytes));
boolean isPersistRequired = false;
final Map<String, Set<SegmentIdentifier>> segmentsToMoveOut = new HashMap<>();
for (InputRow row : rows) {
if (row != null && withinMinMaxRecordTime(row)) {
final String sequenceName = sequenceNames.get(record.partition());
final AppenderatorDriverAddResult addResult = driver.add(
row,
sequenceName,
committerSupplier
committerSupplier,
false,
false
);
if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of AppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
driver.moveSegmentOut(sequenceName, ImmutableList.of(addResult.getSegmentIdentifier()));
segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet<>())
.add(addResult.getSegmentIdentifier());
}
isPersistRequired |= addResult.isPersistRequired();
} else {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}
fireDepartmentMetrics.incrementProcessed();
} else {
fireDepartmentMetrics.incrementThrownAway();
}
}
if (isPersistRequired) {
driver.persist(committerSupplier.get());
}
segmentsToMoveOut.entrySet().forEach(sequenceSegments -> driver.moveSegmentOut(
sequenceSegments.getKey(),
sequenceSegments.getValue().stream().collect(Collectors.toList())
));
}
catch (ParseException e) {
if (tuningConfig.isReportParseExceptions()) {
throw e;

View File

@ -91,7 +91,7 @@ public class MapPopulator<K, V>
if (lines == Integer.MAX_VALUE) {
throw new ISE("Cannot read more than %,d lines", Integer.MAX_VALUE);
}
final Map<K, V> kvMap = parser.parse(line);
final Map<K, V> kvMap = parser.parseToMap(line);
map.putAll(kvMap);
lines++;
entries += kvMap.size();

View File

@ -213,9 +213,9 @@ public class UriExtractionNamespace implements ExtractionNamespace
}
@Override
public Map<String, String> parse(String input)
public Map<String, String> parseToMap(String input)
{
final Map<String, Object> inner = delegate.parse(input);
final Map<String, Object> inner = delegate.parseToMap(input);
final String k = Preconditions.checkNotNull(
inner.get(key),
"Key column [%s] missing data in line [%s]",
@ -612,7 +612,7 @@ public class UriExtractionNamespace implements ExtractionNamespace
parser = new Parser<String, String>()
{
@Override
public Map<String, String> parse(String input)
public Map<String, String> parseToMap(String input)
{
try {
return jsonFactory.createParser(input).readValueAs(JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING);

View File

@ -87,7 +87,7 @@ public class UriExtractionNamespaceTest
"col3"
), "col2", "col3"
);
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parse("A,B,C"));
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parseToMap("A,B,C"));
}
@Test(expected = IllegalArgumentException.class)
@ -100,7 +100,7 @@ public class UriExtractionNamespaceTest
"col3"
), "col2", "col3ADFSDF"
);
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parse("A,B,C"));
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parseToMap("A,B,C"));
}
@Test(expected = NullPointerException.class)
@ -113,7 +113,7 @@ public class UriExtractionNamespaceTest
"col3"
), "col2", "col3"
);
Map<String, String> map = parser.getParser().parse("A");
Map<String, String> map = parser.getParser().parseToMap("A");
}
@Test
@ -125,7 +125,7 @@ public class UriExtractionNamespaceTest
null, "col2",
"col3"
);
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parse("A|B|C"));
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parseToMap("A|B|C"));
}
@Test
@ -137,7 +137,7 @@ public class UriExtractionNamespaceTest
"\\u0002", "col2",
"col3"
);
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parse("A\\u0001B\\u0001C"));
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parseToMap("A\\u0001B\\u0001C"));
}
@Test(expected = IllegalArgumentException.class)
@ -149,8 +149,8 @@ public class UriExtractionNamespaceTest
null, "col2",
"col3"
);
Map<String, String> map = parser.getParser().parse("A,B,C");
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parse("A,B,C"));
Map<String, String> map = parser.getParser().parseToMap("A,B,C");
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parseToMap("A,B,C"));
}
@ -163,8 +163,8 @@ public class UriExtractionNamespaceTest
null, "col2",
"col3"
);
Map<String, String> map = parser.getParser().parse("A");
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parse("A,B,C"));
Map<String, String> map = parser.getParser().parseToMap("A");
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parseToMap("A,B,C"));
}
@Test
@ -180,7 +180,7 @@ public class UriExtractionNamespaceTest
Assert.assertEquals(
ImmutableMap.of("B", "C"),
parser.getParser()
.parse(
.parseToMap(
StringUtils.format(
"{\"%s\":\"B\", \"%s\":\"C\", \"FOO\":\"BAR\"}",
keyField,
@ -204,7 +204,7 @@ public class UriExtractionNamespaceTest
Assert.assertEquals(
ImmutableMap.of("B", "C"),
parser.getParser()
.parse(
.parseToMap(
StringUtils.format(
"{\"%sDFSDFDS\":\"B\", \"%s\":\"C\", \"FOO\":\"BAR\"}",
keyField,
@ -227,7 +227,7 @@ public class UriExtractionNamespaceTest
Assert.assertEquals(
ImmutableMap.of("B", "C"),
parser.getParser()
.parse(
.parseToMap(
StringUtils.format(
"{\"%sDFSDFDS\":\"B\", \"%s\":\"C\", \"FOO\":\"BAR\"}",
keyField,
@ -250,7 +250,7 @@ public class UriExtractionNamespaceTest
Assert.assertEquals(
ImmutableMap.of("B", "C"),
parser.getParser()
.parse(
.parseToMap(
StringUtils.format(
"{\"%sDFSDFDS\":\"B\", \"%s\":\"C\", \"FOO\":\"BAR\"}",
keyField,
@ -273,7 +273,7 @@ public class UriExtractionNamespaceTest
Assert.assertEquals(
ImmutableMap.of("B", "C"),
parser.getParser()
.parse(
.parseToMap(
StringUtils.format(
"{\"%sDFSDFDS\":\"B\", \"%s\":\"C\", \"FOO\":\"BAR\"}",
keyField,
@ -289,7 +289,7 @@ public class UriExtractionNamespaceTest
UriExtractionNamespace.ObjectMapperFlatDataParser parser = new UriExtractionNamespace.ObjectMapperFlatDataParser(
registerTypes(new ObjectMapper())
);
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parse("{\"B\":\"C\"}"));
Assert.assertEquals(ImmutableMap.of("B", "C"), parser.getParser().parseToMap("{\"B\":\"C\"}"));
}
@Test
@ -398,7 +398,7 @@ public class UriExtractionNamespaceTest
"num string value",
ImmutableMap.of("B", nString),
parser.getParser()
.parse(
.parseToMap(
StringUtils.format(
"{\"%s\":\"B\", \"%s\":\"%d\", \"FOO\":\"BAR\"}",
keyField,
@ -411,7 +411,7 @@ public class UriExtractionNamespaceTest
"num string key",
ImmutableMap.of(nString, "C"),
parser.getParser()
.parse(
.parseToMap(
StringUtils.format(
"{\"%s\":\"%d\", \"%s\":\"C\", \"FOO\":\"BAR\"}",
keyField,
@ -424,7 +424,7 @@ public class UriExtractionNamespaceTest
"num value",
ImmutableMap.of("B", nString),
parser.getParser()
.parse(
.parseToMap(
StringUtils.format(
"{\"%s\":\"B\", \"%s\":%d, \"FOO\":\"BAR\"}",
keyField,
@ -437,7 +437,7 @@ public class UriExtractionNamespaceTest
"num key",
ImmutableMap.of(nString, "C"),
parser.getParser()
.parse(
.parseToMap(
StringUtils.format(
"{\"%s\":%d, \"%s\":\"C\", \"FOO\":\"BAR\"}",
keyField,
@ -458,7 +458,7 @@ public class UriExtractionNamespaceTest
final String nString = StringUtils.format("%d", n);
Assert.assertEquals(
ImmutableMap.of("key", nString),
parser.getParser().parse(StringUtils.format("{\"key\":%d}", n))
parser.getParser().parseToMap(StringUtils.format("{\"key\":%d}", n))
);
}
}

View File

@ -22,6 +22,7 @@ package io.druid.data.input.protobuf;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
@ -41,6 +42,7 @@ import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -79,7 +81,7 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
}
@Override
public InputRow parse(ByteBuffer input)
public List<InputRow> parseBatch(ByteBuffer input)
{
if (parser == null) {
// parser should be created when it is really used to avoid unnecessary initialization of the underlying
@ -95,12 +97,12 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
throw new ParseException(e, "Protobuf message could not be parsed");
}
Map<String, Object> record = parser.parse(json);
return new MapBasedInputRow(
Map<String, Object> record = parser.parseToMap(json);
return ImmutableList.of(new MapBasedInputRow(
parseSpec.getTimestampSpec().extractTimestamp(record),
parseSpec.getDimensionsSpec().getDimensionNames(),
record
);
));
}
private Descriptor getDescriptor(String descriptorFilePath)

View File

@ -156,7 +156,7 @@ public class ProtobufInputRowParserTest
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parse(ByteBuffer.wrap(out.toByteArray()));
InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
System.out.println(row);
assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
@ -199,7 +199,7 @@ public class ProtobufInputRowParserTest
expectedException.expect(CoreMatchers.instanceOf(IllegalStateException.class));
expectedException.expectMessage("JavaScript is disabled");
parser.parse(ByteBuffer.allocate(1));
parser.parseBatch(ByteBuffer.allocate(1)).get(0);
}
private void assertDimensionEquals(InputRow row, String dimension, Object expected)

View File

@ -19,19 +19,21 @@
package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.collect.Utils;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Object, Object, KEYOUT, VALUEOUT>
{
@ -61,9 +63,9 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
{
try {
final InputRow inputRow;
final List<InputRow> inputRows;
try {
inputRow = parseInputRow(value, parser);
inputRows = parseInputRow(value, parser);
}
catch (ParseException e) {
if (reportParseExceptions) {
@ -74,38 +76,38 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
return; // we're ignoring this invalid row
}
for (InputRow inputRow : inputRows) {
if (inputRow == null) {
// Throw away null rows from the parser.
log.debug("Throwing away row [%s]", value);
return;
continue;
}
if (!granularitySpec.bucketIntervals().isPresent()
|| granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
.isPresent()) {
innerMap(inputRow, context, reportParseExceptions);
}
}
}
catch (RuntimeException e) {
throw new RE(e, "Failure on row[%s]", value);
}
}
@Nullable
public static InputRow parseInputRow(Object value, InputRowParser parser)
private static List<InputRow> parseInputRow(Object value, InputRowParser parser)
{
if (parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
//HadoopyStringInputRowParser can handle this and this special case is not needed
//except for backward compatibility
return ((StringInputRowParser) parser).parse(value.toString());
return Utils.nullableListOf(((StringInputRowParser) parser).parse(value.toString()));
} else if (value instanceof InputRow) {
return (InputRow) value;
return ImmutableList.of((InputRow) value);
} else if (value == null) {
// Pass through nulls so they get thrown away.
return null;
return Utils.nullableListOf((InputRow) null);
} else {
return parser.parse(value);
return parser.parseBatch(value);
}
}

View File

@ -20,16 +20,17 @@
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.java.util.common.IAE;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import java.nio.ByteBuffer;
import java.util.List;
/**
*/
@ -43,13 +44,13 @@ public class HadoopyStringInputRowParser implements InputRowParser<Object>
}
@Override
public InputRow parse(Object input)
public List<InputRow> parseBatch(Object input)
{
if (input instanceof Text) {
return parser.parse(((Text) input).toString());
return ImmutableList.of(parser.parse(((Text) input).toString()));
} else if (input instanceof BytesWritable) {
BytesWritable valueBytes = (BytesWritable) input;
return parser.parse(ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength()));
return parser.parseBatch(ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength()));
} else {
throw new IAE("can't convert type [%s] to InputRow", input.getClass().getName());
}

View File

@ -130,7 +130,7 @@ public class YeOldePlumberSchool implements PlumberSchool
return -1;
}
final int numRows = sink.add(row);
final int numRows = sink.add(row, false);
if (!sink.canAppendRow()) {
persist(committerSupplier.get());

View File

@ -197,7 +197,7 @@ public class RealtimeIndexTaskTest
public InputRow nextRow()
{
synchronized (this) {
final InputRow row = parser.parse(queue.remove(0));
final InputRow row = parser.parseBatch(queue.remove(0)).get(0);
if (row != null && row.getRaw(FAIL_DIM) != null) {
throw new ParseException(FAIL_DIM);
}

View File

@ -164,7 +164,7 @@ public class IngestSegmentFirehoseFactoryTest
.buildOnheap();
for (Integer i = 0; i < MAX_ROWS; ++i) {
index.add(ROW_PARSER.parse(buildRow(i.longValue())));
index.add(ROW_PARSER.parseBatch(buildRow(i.longValue())).get(0));
}
if (!persistDir.mkdirs() && !persistDir.exists()) {

View File

@ -23,8 +23,11 @@ package io.druid.java.util.common.collect;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class Utils
@ -33,7 +36,8 @@ public class Utils
{
Preconditions.checkArgument(values.length == keys.length,
"number of values[%s] different than number of keys[%s]",
values.length, keys.length);
values.length, keys.length
);
return zipMapPartial(keys, values);
}
@ -42,7 +46,8 @@ public class Utils
{
Preconditions.checkArgument(values.length <= keys.length,
"number of values[%s] exceeds number of keys[%s]",
values.length, keys.length);
values.length, keys.length
);
Map<K, V> retVal = new LinkedHashMap<>();
@ -53,8 +58,10 @@ public class Utils
return retVal;
}
/** Create a Map from iterables of keys and values. Will throw an exception if there are more keys than values,
* or more values than keys. */
/**
* Create a Map from iterables of keys and values. Will throw an exception if there are more keys than values,
* or more values than keys.
*/
public static <K, V> Map<K, V> zipMap(Iterable<K> keys, Iterable<V> values)
{
Map<K, V> retVal = new LinkedHashMap<>();
@ -67,20 +74,24 @@ public class Utils
Preconditions.checkArgument(valsIter.hasNext(),
"number of values[%s] less than number of keys, broke on key[%s]",
retVal.size(), key);
retVal.size(), key
);
retVal.put(key, valsIter.next());
}
Preconditions.checkArgument(!valsIter.hasNext(),
"number of values[%s] exceeds number of keys[%s]",
retVal.size() + Iterators.size(valsIter), retVal.size());
retVal.size() + Iterators.size(valsIter), retVal.size()
);
return retVal;
}
/** Create a Map from iterables of keys and values. If there are more keys than values, or more values than keys,
* the excess will be omitted. */
/**
* Create a Map from iterables of keys and values. If there are more keys than values, or more values than keys,
* the excess will be omitted.
*/
public static <K, V> Map<K, V> zipMapPartial(Iterable<K> keys, Iterable<V> values)
{
Map<K, V> retVal = new LinkedHashMap<>();
@ -101,4 +112,20 @@ public class Utils
return retVal;
}
@SafeVarargs
public static <T> List<T> nullableListOf(@Nullable T... elements)
{
final List<T> list;
if (elements == null) {
list = new ArrayList<>(1);
list.add(null);
} else {
list = new ArrayList<>(elements.length);
for (T element : elements) {
list.add(element);
}
}
return list;
}
}

View File

@ -127,7 +127,7 @@ public abstract class AbstractFlatTextFormatParser implements Parser<String, Obj
}
@Override
public Map<String, Object> parse(final String input)
public Map<String, Object> parseToMap(final String input)
{
if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) {
throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. "

View File

@ -67,7 +67,7 @@ public class JSONPathParser implements Parser<String, Object>
* @return A map of field names and values
*/
@Override
public Map<String, Object> parse(String input)
public Map<String, Object> parseToMap(String input)
{
try {
JsonNode document = mapper.readValue(input, JsonNode.class);

View File

@ -109,7 +109,7 @@ public class JSONToLowerParser implements Parser<String, Object>
}
@Override
public Map<String, Object> parse(String input)
public Map<String, Object> parseToMap(String input)
{
try {
Map<String, Object> map = new LinkedHashMap<>();

View File

@ -74,7 +74,7 @@ public class JavaScriptParser implements Parser<String, Object>
}
@Override
public Map<String, Object> parse(String input)
public Map<String, Object> parseToMap(String input)
{
try {
final Object compiled = fn.apply(input);

View File

@ -43,7 +43,7 @@ public interface Parser<K, V>
* @throws ParseException if the String cannot be parsed
*/
@Nullable
Map<K, V> parse(String input);
Map<K, V> parseToMap(String input);
/**
* Set the fieldNames that you expect to see in parsed Maps. Deprecated; Parsers should not, in general, be

View File

@ -40,7 +40,7 @@ public class Parsers
public Map<K, V> apply(String input)
{
try {
return p.parse(input);
return p.parseToMap(input);
}
catch (Exception e) {
return null;

View File

@ -84,7 +84,7 @@ public class RegexParser implements Parser<String, Object>
}
@Override
public Map<String, Object> parse(String input)
public Map<String, Object> parseToMap(String input)
{
try {
final Matcher matcher = compiled.matcher(input);

View File

@ -37,9 +37,9 @@ public class ToLowerCaseParser implements Parser<String, Object>
}
@Override
public Map parse(String input)
public Map parseToMap(String input)
{
Map<String, Object> line = baseParser.parse(input);
Map<String, Object> line = baseParser.parseToMap(input);
Map<String, Object> retVal = Maps.newLinkedHashMap();
for (Map.Entry<String, Object> entry : line.entrySet()) {
String k = StringUtils.toLowerCase(entry.getKey());

View File

@ -86,7 +86,7 @@ public class FlatTextFormatParserTest
final String header = concat(format, "time", "value1", "value2");
final Parser<String, Object> parser = parserFactory.get(format, header);
final String body = concat(format, "hello", "world", "foo");
final Map<String, Object> jsonMap = parser.parse(body);
final Map<String, Object> jsonMap = parser.parseToMap(body);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"),
@ -99,7 +99,7 @@ public class FlatTextFormatParserTest
{
final Parser<String, Object> parser = parserFactory.get(format);
final String body = concat(format, "hello", "world", "foo");
final Map<String, Object> jsonMap = parser.parse(body);
final Map<String, Object> jsonMap = parser.parseToMap(body);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"),
@ -120,9 +120,9 @@ public class FlatTextFormatParserTest
};
int index;
for (index = 0; index < skipHeaderRows; index++) {
Assert.assertNull(parser.parse(body[index]));
Assert.assertNull(parser.parseToMap(body[index]));
}
final Map<String, Object> jsonMap = parser.parse(body[index]);
final Map<String, Object> jsonMap = parser.parseToMap(body[index]);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"),
@ -139,8 +139,8 @@ public class FlatTextFormatParserTest
concat(format, "time", "value1", "value2"),
concat(format, "hello", "world", "foo")
};
Assert.assertNull(parser.parse(body[0]));
final Map<String, Object> jsonMap = parser.parse(body[1]);
Assert.assertNull(parser.parseToMap(body[0]));
final Map<String, Object> jsonMap = parser.parseToMap(body[1]);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"),
@ -157,8 +157,8 @@ public class FlatTextFormatParserTest
concat(format, "time", "", "value2", ""),
concat(format, "hello", "world", "foo", "bar")
};
Assert.assertNull(parser.parse(body[0]));
final Map<String, Object> jsonMap = parser.parse(body[1]);
Assert.assertNull(parser.parseToMap(body[0]));
final Map<String, Object> jsonMap = parser.parseToMap(body[1]);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("time", "hello", "column_2", "world", "value2", "foo", "column_4", "bar"),
@ -175,8 +175,8 @@ public class FlatTextFormatParserTest
concat(format, "time", "value1", "value2"),
concat(format, "hello", "world", "foo")
};
Assert.assertNull(parser.parse(body[0]));
Map<String, Object> jsonMap = parser.parse(body[1]);
Assert.assertNull(parser.parseToMap(body[0]));
Map<String, Object> jsonMap = parser.parseToMap(body[1]);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"),
@ -188,8 +188,8 @@ public class FlatTextFormatParserTest
concat(format, "time", "value1", "value2", "value3"),
concat(format, "hello", "world", "foo", "bar")
};
Assert.assertNull(parser.parse(body2[0]));
jsonMap = parser.parse(body2[1]);
Assert.assertNull(parser.parseToMap(body2[0]));
jsonMap = parser.parseToMap(body2[1]);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo", "value3", "bar"),
@ -212,7 +212,7 @@ public class FlatTextFormatParserTest
concat(format, "header", "line", "2"),
concat(format, "hello", "world", "foo")
};
parser.parse(body[0]);
parser.parseToMap(body[0]);
}
private static class FlatTextFormatParserFactory

View File

@ -57,7 +57,7 @@ public class JSONPathParserTest
{
List<JSONPathFieldSpec> fields = new ArrayList<>();
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Map<String, Object> jsonMap = jsonParser.parse(json);
final Map<String, Object> jsonMap = jsonParser.parseToMap(json);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("one", "foo", "two", ImmutableList.of("bar", "baz"), "three", "qux"),
@ -70,7 +70,7 @@ public class JSONPathParserTest
{
List<JSONPathFieldSpec> fields = new ArrayList<>();
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Map<String, Object> jsonMap = jsonParser.parse(numbersJson);
final Map<String, Object> jsonMap = jsonParser.parseToMap(numbersJson);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("five", 5.0, "six", 6L, "many", 1234567878900L, "toomany", 1.23456789E21),
@ -83,7 +83,7 @@ public class JSONPathParserTest
{
List<JSONPathFieldSpec> fields = new ArrayList<>();
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Map<String, Object> jsonMap = jsonParser.parse(whackyCharacterJson);
final Map<String, Object> jsonMap = jsonParser.parseToMap(whackyCharacterJson);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("one", "foo?"),
@ -113,7 +113,7 @@ public class JSONPathParserTest
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Map<String, Object> jsonMap = jsonParser.parse(nestedJson);
final Map<String, Object> jsonMap = jsonParser.parseToMap(nestedJson);
// Root fields
Assert.assertEquals(ImmutableList.of(1L, 2L, 3L), jsonMap.get("baz"));
@ -174,7 +174,7 @@ public class JSONPathParserTest
fields.add(new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq-met-array", ".met.a"));
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null);
final Map<String, Object> jsonMap = jsonParser.parse(nestedJson);
final Map<String, Object> jsonMap = jsonParser.parseToMap(nestedJson);
// Root fields
Assert.assertEquals("text", jsonMap.get("simpleVal"));
@ -211,7 +211,7 @@ public class JSONPathParserTest
thrown.expectMessage("Cannot have duplicate field definition: met-array");
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null);
final Map<String, Object> jsonMap = jsonParser.parse(nestedJson);
final Map<String, Object> jsonMap = jsonParser.parseToMap(nestedJson);
}
@Test
@ -225,7 +225,7 @@ public class JSONPathParserTest
thrown.expectMessage("Cannot have duplicate field definition: met-array");
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null);
final Map<String, Object> jsonMap = jsonParser.parse(nestedJson);
final Map<String, Object> jsonMap = jsonParser.parseToMap(nestedJson);
}
@Test
@ -237,6 +237,6 @@ public class JSONPathParserTest
thrown.expectMessage("Unable to parse row [" + notJson + "]");
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Map<String, Object> jsonMap = jsonParser.parse(notJson);
final Map<String, Object> jsonMap = jsonParser.parseToMap(notJson);
}
}

View File

@ -41,7 +41,7 @@ public class JavaScriptParserTest
);
String data = "foo-val1";
final Map<String, Object> parsed = parser.parse(data);
final Map<String, Object> parsed = parser.parseToMap(data);
ImmutableMap.Builder builder = ImmutableMap.builder();
builder.put("one", "foo");
builder.put("two", "val1");
@ -62,7 +62,7 @@ public class JavaScriptParserTest
);
String data = "val1-val2";
final Map<String, Object> parsed = parser.parse(data);
final Map<String, Object> parsed = parser.parseToMap(data);
ImmutableMap.Builder builder = ImmutableMap.builder();
builder.put("one", Lists.newArrayList("val1", "val2"));
Assert.assertEquals(

View File

@ -66,7 +66,7 @@ public class RegexParserTest
);
String data = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 3E57427F3EXAMPLE REST.GET.VERSIONING - \"GET /mybucket?versioning HTTP/1.1\" 200 - 113 - 7 - \"-\" \"S3Console/0.4\" -";
final Map<String, Object> parsed = parser.parse(data);
final Map<String, Object> parsed = parser.parseToMap(data);
ImmutableMap.Builder builder = ImmutableMap.builder();
builder.put("Bucket Owner", "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be");
builder.put("Bucket", "mybucket");
@ -127,7 +127,7 @@ public class RegexParserTest
);
String data = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:01:00 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 7B4A0FABBEXAMPLE REST.GET.VERSIONING - \"GET /mybucket?versioning HTTP/1.1\" 200 - 139 139 27 26 \"-\" \"() { foo;};echo; /bin/bash -c \"expr 299663299665 / 3; echo 333:; uname -a; echo 333:; id;\"\" -";
final Map<String, Object> parsed = parser.parse(data);
final Map<String, Object> parsed = parser.parseToMap(data);
ImmutableMap.Builder builder = ImmutableMap.builder();
builder.put("Bucket Owner", "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be");
builder.put("Bucket", "mybucket");
@ -175,7 +175,7 @@ public class RegexParserTest
);
String data = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket@mybucket2";
final Map<String, Object> parsed = parser.parse(data);
final Map<String, Object> parsed = parser.parseToMap(data);
ImmutableMap.Builder builder = ImmutableMap.builder();
builder.put("Bucket Owner", "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be");
builder.put("Bucket", Lists.newArrayList("mybucket", "mybucket2"));
@ -199,7 +199,7 @@ public class RegexParserTest
);
String data = "1a2";
final Map<String, Object> parsed = parser.parse(data);
final Map<String, Object> parsed = parser.parseToMap(data);
ImmutableMap.Builder builder = ImmutableMap.builder();
builder.put("column_1", Lists.newArrayList("1", "2"));
@ -226,6 +226,6 @@ public class RegexParserTest
);
String data = "BBBB";
parser.parse(data);
parser.parseToMap(data);
}
}

View File

@ -441,7 +441,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
Supplier<InputRow> rowSupplier,
boolean skipMaxRowsInMemoryCheck
) throws IndexSizeExceededException;
public abstract int getLastRowIndex();
@ -493,6 +494,11 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row) throws IndexSizeExceededException
{
return add(row, false);
}
public int add(InputRow row, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException
{
TimeAndDims key = toTimeAndDims(row);
final int rv = addToFacts(
@ -503,14 +509,15 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
numEntries,
key,
in,
rowSupplier
rowSupplier,
skipMaxRowsInMemoryCheck
);
updateMaxIngestedTime(row.getTimestamp());
return rv;
}
@VisibleForTesting
TimeAndDims toTimeAndDims(InputRow row) throws IndexSizeExceededException
TimeAndDims toTimeAndDims(InputRow row)
{
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {

View File

@ -146,7 +146,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
Supplier<InputRow> rowSupplier,
boolean skipMaxRowsInMemoryCheck // ignored, we always want to check this for offheap
) throws IndexSizeExceededException
{
ByteBuffer aggBuffer;

View File

@ -109,7 +109,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
Supplier<InputRow> rowSupplier,
boolean skipMaxRowsInMemoryCheck
) throws IndexSizeExceededException
{
final int priorIndex = facts.getPriorIndex(key);
@ -128,7 +129,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
concurrentSet(rowIndex, aggs);
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX) {
if (numEntries.get() >= maxRowCount
&& facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX
&& !skipMaxRowsInMemoryCheck) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}
final int prev = facts.putIfAbsent(key, rowIndex);

View File

@ -23,6 +23,9 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import java.util.List;
import java.util.stream.Collectors;
public class TransformingInputRowParser<T> implements InputRowParser<T>
{
private final InputRowParser<T> parser;
@ -42,9 +45,9 @@ public class TransformingInputRowParser<T> implements InputRowParser<T>
}
@Override
public InputRow parse(final T row)
public List<InputRow> parseBatch(final T row)
{
return transformer.transform(parser.parse(row));
return parser.parseBatch(row).stream().map(transformer::transform).collect(Collectors.toList());
}
@Override

View File

@ -25,6 +25,8 @@ import io.druid.data.input.impl.StringInputRowParser;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
public class TransformingStringInputRowParser extends StringInputRowParser
{
@ -43,9 +45,9 @@ public class TransformingStringInputRowParser extends StringInputRowParser
}
@Override
public InputRow parse(final ByteBuffer input)
public List<InputRow> parseBatch(final ByteBuffer input)
{
return transformer.transform(super.parse(input));
return super.parseBatch(input).stream().map(transformer::transform).collect(Collectors.toList());
}
@Nullable

View File

@ -331,7 +331,7 @@ public class DoubleStorageTest
getStreamOfEvents().forEach(o -> {
try {
index.add(ROW_PARSER.parse((Map) o));
index.add(ROW_PARSER.parseBatch((Map<String, Object>) o).get(0));
}
catch (IndexSizeExceededException e) {
Throwables.propagate(e);

View File

@ -89,12 +89,12 @@ public class SchemaEvolutionTest
)
);
return ImmutableList.of(
parser.parse(ImmutableMap.<String, Object>of("t", "2000-01-01", "c1", "9", "c2", ImmutableList.of("a"))),
parser.parse(ImmutableMap.<String, Object>of("t", "2000-01-02", "c1", "10.1", "c2", ImmutableList.of())),
parser.parse(ImmutableMap.<String, Object>of("t", "2000-01-03", "c1", "2", "c2", ImmutableList.of(""))),
parser.parse(ImmutableMap.<String, Object>of("t", "2001-01-01", "c1", "1", "c2", ImmutableList.of("a", "c"))),
parser.parse(ImmutableMap.<String, Object>of("t", "2001-01-02", "c1", "4", "c2", ImmutableList.of("abc"))),
parser.parse(ImmutableMap.<String, Object>of("t", "2001-01-03", "c1", "5"))
parser.parseBatch(ImmutableMap.<String, Object>of("t", "2000-01-01", "c1", "9", "c2", ImmutableList.of("a"))).get(0),
parser.parseBatch(ImmutableMap.<String, Object>of("t", "2000-01-02", "c1", "10.1", "c2", ImmutableList.of())).get(0),
parser.parseBatch(ImmutableMap.<String, Object>of("t", "2000-01-03", "c1", "2", "c2", ImmutableList.of(""))).get(0),
parser.parseBatch(ImmutableMap.<String, Object>of("t", "2001-01-01", "c1", "1", "c2", ImmutableList.of("a", "c"))).get(0),
parser.parseBatch(ImmutableMap.<String, Object>of("t", "2001-01-02", "c1", "4", "c2", ImmutableList.of("abc"))).get(0),
parser.parseBatch(ImmutableMap.<String, Object>of("t", "2001-01-03", "c1", "5")).get(0)
);
}

View File

@ -35,6 +35,7 @@ import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
@ -447,7 +448,7 @@ public class AggregationTestHelper
//InputRowsParser<String>
index.add(((StringInputRowParser) parser).parse((String) row));
} else {
index.add(parser.parse(row));
index.add(((List<InputRow>) parser.parseBatch(row)).get(0));
}
}

View File

@ -58,12 +58,12 @@ public class AndFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "0")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "0")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "0")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "0")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "0")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "0"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "0")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "0")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "0")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "0")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "0")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "0")).get(0)
);
public AndFilterTest(

View File

@ -59,14 +59,14 @@ public class BoundFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.<String>of())),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "6", "dim1", "-1000", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "7", "dim1", "-10.012", "dim2", ImmutableList.of("d")))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.<String>of())).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "6", "dim1", "-1000", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "7", "dim1", "-10.012", "dim2", ImmutableList.of("d"))).get(0)
);
public BoundFilterTest(

View File

@ -65,16 +65,16 @@ public class ColumnComparisonFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("1", "2"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("3"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "1", "dim2", ImmutableList.of("4", "5"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "5", "dim2", ImmutableList.of("4", "5"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "6", "dim1", "1")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "7", "dim1", "a")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "8", "dim1", 8L)),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "9", "dim1", 1.234f, "dim2", 1.234f))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("1", "2"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("3"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "1", "dim2", ImmutableList.of("4", "5"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "5", "dim2", ImmutableList.of("4", "5"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "6", "dim1", "1")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "7", "dim1", "a")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "8", "dim1", 8L)).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "9", "dim1", 1.234f, "dim2", 1.234f)).get(0)
);
public ColumnComparisonFilterTest(

View File

@ -81,7 +81,7 @@ public class ExpressionFilterTest extends BaseFilterTest
ImmutableMap.of("dim0", "7", "dim1", 7L, "dim2", 7.0f, "dim3", "a"),
ImmutableMap.of("dim0", "8", "dim1", 8L, "dim2", 8.0f, "dim3", 8L),
ImmutableMap.of("dim0", "9", "dim1", 9L, "dim2", 9.0f, "dim3", 1.234f, "dim4", 1.234f)
).stream().map(PARSER::parse).collect(Collectors.toList());
).stream().map(e -> PARSER.parseBatch(e).get(0)).collect(Collectors.toList());
public ExpressionFilterTest(
String testName,

View File

@ -167,16 +167,16 @@ public class FilterPartitionTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "6", "dim1", "B453B411", "dim2", ImmutableList.of("c", "d", "e"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "7", "dim1", "HELLO", "dim2", ImmutableList.of("foo"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "8", "dim1", "abc", "dim2", ImmutableList.of("bar"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "9", "dim1", "1", "dim2", ImmutableList.of("foo", "bar")))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "6", "dim1", "B453B411", "dim2", ImmutableList.of("c", "d", "e"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "7", "dim1", "HELLO", "dim2", ImmutableList.of("foo"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "8", "dim1", "abc", "dim2", ImmutableList.of("bar"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "9", "dim1", "1", "dim2", ImmutableList.of("foo", "bar"))).get(0)
);
public FilterPartitionTest(

View File

@ -93,12 +93,12 @@ public class FloatAndDoubleFilteringTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.of("ts", 1L, "dim0", "1", "flt", 1.0f, "dbl", 1.0d)),
PARSER.parse(ImmutableMap.of("ts", 2L, "dim0", "2", "flt", 2.0f, "dbl", 2.0d)),
PARSER.parse(ImmutableMap.of("ts", 3L, "dim0", "3", "flt", 3.0f, "dbl", 3.0d)),
PARSER.parse(ImmutableMap.of("ts", 4L, "dim0", "4", "flt", 4.0f, "dbl", 4.0d)),
PARSER.parse(ImmutableMap.of("ts", 5L, "dim0", "5", "flt", 5.0f, "dbl", 5.0d)),
PARSER.parse(ImmutableMap.of("ts", 6L, "dim0", "6", "flt", 6.0f, "dbl", 6.0d))
PARSER.parseBatch(ImmutableMap.of("ts", 1L, "dim0", "1", "flt", 1.0f, "dbl", 1.0d)).get(0),
PARSER.parseBatch(ImmutableMap.of("ts", 2L, "dim0", "2", "flt", 2.0f, "dbl", 2.0d)).get(0),
PARSER.parseBatch(ImmutableMap.of("ts", 3L, "dim0", "3", "flt", 3.0f, "dbl", 3.0d)).get(0),
PARSER.parseBatch(ImmutableMap.of("ts", 4L, "dim0", "4", "flt", 4.0f, "dbl", 4.0d)).get(0),
PARSER.parseBatch(ImmutableMap.of("ts", 5L, "dim0", "5", "flt", 5.0f, "dbl", 5.0d)).get(0),
PARSER.parseBatch(ImmutableMap.of("ts", 6L, "dim0", "6", "flt", 6.0f, "dbl", 6.0d)).get(0)
);
public FloatAndDoubleFilteringTest(

View File

@ -63,12 +63,12 @@ public class InFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "a", "dim1", "", "dim2", ImmutableList.of("a", "b"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "b", "dim1", "10", "dim2", ImmutableList.of())),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "c", "dim1", "2", "dim2", ImmutableList.of(""))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "d", "dim1", "1", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "e", "dim1", "def", "dim2", ImmutableList.of("c"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "f", "dim1", "abc"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "a", "dim1", "", "dim2", ImmutableList.of("a", "b"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "b", "dim1", "10", "dim2", ImmutableList.of())).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "c", "dim1", "2", "dim2", ImmutableList.of(""))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "d", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "e", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "f", "dim1", "abc")).get(0)
);
public InFilterTest(

View File

@ -64,12 +64,12 @@ public class InvalidFilteringTest extends BaseFilterTest
)
);
private static final InputRow row0 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 1L, "dim0", "1", "dim1", "", "dim2", ImmutableList.of("a", "b")));
private static final InputRow row1 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 2L, "dim0", "2", "dim1", "10", "dim2", ImmutableList.of()));
private static final InputRow row2 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 3L, "dim0", "3", "dim1", "2", "dim2", ImmutableList.of("")));
private static final InputRow row3 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 4L, "dim0", "4", "dim1", "1", "dim2", ImmutableList.of("a")));
private static final InputRow row4 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 5L, "dim0", "5", "dim1", "def", "dim2", ImmutableList.of("c")));
private static final InputRow row5 = PARSER.parse(ImmutableMap.<String, Object>of("ts", 6L, "dim0", "6", "dim1", "abc"));
private static final InputRow row0 = PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 1L, "dim0", "1", "dim1", "", "dim2", ImmutableList.of("a", "b"))).get(0);
private static final InputRow row1 = PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 2L, "dim0", "2", "dim1", "10", "dim2", ImmutableList.of())).get(0);
private static final InputRow row2 = PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 3L, "dim0", "3", "dim1", "2", "dim2", ImmutableList.of(""))).get(0);
private static final InputRow row3 = PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 4L, "dim0", "4", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0);
private static final InputRow row4 = PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 5L, "dim0", "5", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0);
private static final InputRow row5 = PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 6L, "dim0", "6", "dim1", "abc")).get(0);
private static final List<InputRow> ROWS = ImmutableList.of(
row0,

View File

@ -64,12 +64,12 @@ public class JavaScriptFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc")).get(0)
);
public JavaScriptFilterTest(

View File

@ -56,12 +56,12 @@ public class LikeFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "foo")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "foobar")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "bar")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "foobarbaz")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "foo%bar"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "foo")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "foobar")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "bar")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "foobarbaz")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "foo%bar")).get(0)
);
public LikeFilterTest(

View File

@ -86,16 +86,16 @@ public class LongFilteringTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("ts", 1L, "dim0", "1", "lng", 1L, "dim1", "", "dim2", ImmutableList.of("a", "b"))),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 2L, "dim0", "2", "lng", 2L, "dim1", "10", "dim2", ImmutableList.of())),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 3L, "dim0", "3", "lng", 3L, "dim1", "2", "dim2", ImmutableList.of(""))),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 4L, "dim0", "4", "lng", 4L, "dim1", "1", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 5L, "dim0", "5", "lng", 5L, "dim1", "def", "dim2", ImmutableList.of("c"))),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 6L, "dim0", "6", "lng", 6L, "dim1", "abc")),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 7L, "dim0", "7", "lng", 100000000L, "dim1", "xyz")),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 8L, "dim0", "8", "lng", 100000001L, "dim1", "xyz")),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 9L, "dim0", "9", "lng", -25L, "dim1", "ghi")),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 10L, "dim0", "10", "lng", -100000001L, "dim1", "qqq"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 1L, "dim0", "1", "lng", 1L, "dim1", "", "dim2", ImmutableList.of("a", "b"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 2L, "dim0", "2", "lng", 2L, "dim1", "10", "dim2", ImmutableList.of())).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 3L, "dim0", "3", "lng", 3L, "dim1", "2", "dim2", ImmutableList.of(""))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 4L, "dim0", "4", "lng", 4L, "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 5L, "dim0", "5", "lng", 5L, "dim1", "def", "dim2", ImmutableList.of("c"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 6L, "dim0", "6", "lng", 6L, "dim1", "abc")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 7L, "dim0", "7", "lng", 100000000L, "dim1", "xyz")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 8L, "dim0", "8", "lng", 100000001L, "dim1", "xyz")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 9L, "dim0", "9", "lng", -25L, "dim1", "ghi")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 10L, "dim0", "10", "lng", -100000001L, "dim1", "qqq")).get(0)
);
public LongFilteringTest(

View File

@ -56,12 +56,12 @@ public class NotFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5")).get(0)
);
public NotFilterTest(

View File

@ -62,12 +62,12 @@ public class RegexFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "abdef", "dim2", ImmutableList.of("c"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "abdef", "dim2", ImmutableList.of("c"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc")).get(0)
);
public RegexFilterTest(

View File

@ -64,12 +64,12 @@ public class SearchQueryFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "abdef", "dim2", ImmutableList.of("c"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of())).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "abdef", "dim2", ImmutableList.of("c"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc")).get(0)
);
public SearchQueryFilterTest(

View File

@ -67,12 +67,12 @@ public class SelectorFilterTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"), "dim6", "2017-07-25")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of(), "dim6", "2017-07-25")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""), "dim6", "2017-05-25")),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))),
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"), "dim6", "2017-07-25")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of(), "dim6", "2017-07-25")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""), "dim6", "2017-05-25")).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("dim0", "5", "dim1", "abc")).get(0)
);
public SelectorFilterTest(

View File

@ -80,12 +80,12 @@ public class TimeFilteringTest extends BaseFilterTest
);
private static final List<InputRow> ROWS = ImmutableList.of(
PARSER.parse(ImmutableMap.<String, Object>of("ts", 0L, "dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 1L, "dim0", "1", "dim1", "10", "dim2", ImmutableList.of())),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 2L, "dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 3L, "dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 4L, "dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))),
PARSER.parse(ImmutableMap.<String, Object>of("ts", 5L, "dim0", "5", "dim1", "abc"))
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 0L, "dim0", "0", "dim1", "", "dim2", ImmutableList.of("a", "b"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 1L, "dim0", "1", "dim1", "10", "dim2", ImmutableList.of())).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 2L, "dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 3L, "dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 4L, "dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0),
PARSER.parseBatch(ImmutableMap.<String, Object>of("ts", 5L, "dim0", "5", "dim1", "abc")).get(0)
);
public TimeFilteringTest(

View File

@ -175,7 +175,8 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
Supplier<InputRow> rowSupplier,
boolean skipMaxRowsInMemoryCheck // ignore for benchmark
) throws IndexSizeExceededException
{

View File

@ -82,7 +82,7 @@ public interface IndexerMetadataStorageCoordinator
* @param interval interval for which to allocate a segment
* @param maxVersion use this version if we have no better version to use. The returned segment identifier may
* have a version lower than this one, but will not have one higher.
* @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence.
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
*
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
@ -127,6 +127,7 @@ public interface IndexerMetadataStorageCoordinator
* Removes entry for 'dataSource' from the dataSource metadata table.
*
* @param dataSource identifier
*
* @return true if the entry was deleted, false otherwise
*/
boolean deleteDataSourceMetadata(String dataSource);
@ -136,6 +137,7 @@ public interface IndexerMetadataStorageCoordinator
*
* @param dataSource identifier
* @param dataSourceMetadata value to set
*
* @return true if the entry was reset, false otherwise
*/
boolean resetDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata) throws IOException;

View File

@ -53,6 +53,16 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
*/
Object startJob();
/**
* Same as {@link #add(SegmentIdentifier, InputRow, Supplier, boolean)}, with allowIncrementalPersists set to true
*/
default AppenderatorAddResult add(SegmentIdentifier identifier, InputRow row, Supplier<Committer> committerSupplier)
throws IndexSizeExceededException, SegmentNotWritableException
{
return add(identifier, row, committerSupplier, true);
}
/**
* Add a row. Must not be called concurrently from multiple threads.
* <p>
@ -68,13 +78,25 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* @param identifier the segment into which this row should be added
* @param row the row to add
* @param committerSupplier supplier of a committer associated with all data that has been added, including this row
* if {@param allowIncrementalPersists} is set to false then this will not be used as no
* persist will be done automatically
* @param allowIncrementalPersists indicate whether automatic persist should be performed or not if required.
* If this flag is set to false then the return value should have
* {@link AppenderatorAddResult#isPersistRequired} set to true if persist was skipped
* because of this flag and it is assumed that the responsibility of calling
* {@link #persistAll(Committer)} is on the caller.
*
* @return positive number indicating how many summarized rows exist in this segment so far
* @return {@link AppenderatorAddResult}
*
* @throws IndexSizeExceededException if this row cannot be added because it is too large
* @throws SegmentNotWritableException if the requested segment is known, but has been closed
*/
int add(SegmentIdentifier identifier, InputRow row, Supplier<Committer> committerSupplier)
AppenderatorAddResult add(
SegmentIdentifier identifier,
InputRow row,
Supplier<Committer> committerSupplier,
boolean allowIncrementalPersists
)
throws IndexSizeExceededException, SegmentNotWritableException;
/**
@ -192,4 +214,39 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* in background thread then it does not cause any problems.
*/
void closeNow();
/**
* Result of {@link Appenderator#add(SegmentIdentifier, InputRow, Supplier, boolean)} containing following information
* - SegmentIdentifier - identifier of segment to which rows are being added
* - int - positive number indicating how many summarized rows exist in this segment so far and
* - boolean - true if {@param allowIncrementalPersists} is set to false and persist is required; false otherwise
*/
class AppenderatorAddResult
{
private final SegmentIdentifier segmentIdentifier;
private final int numRowsInSegment;
private final boolean isPersistRequired;
AppenderatorAddResult(SegmentIdentifier identifier, int numRowsInSegment, boolean isPersistRequired)
{
this.segmentIdentifier = identifier;
this.numRowsInSegment = numRowsInSegment;
this.isPersistRequired = isPersistRequired;
}
SegmentIdentifier getSegmentIdentifier()
{
return segmentIdentifier;
}
int getNumRowsInSegment()
{
return numRowsInSegment;
}
boolean isPersistRequired()
{
return isPersistRequired;
}
}
}

View File

@ -39,10 +39,10 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.realtime.FireDepartmentMetrics;
@ -61,6 +61,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -250,31 +251,38 @@ public class AppenderatorDriver implements Closeable
appenderator.clear();
}
/**
* Add a row. Must not be called concurrently from multiple threads.
*
* @param row the row to add
* @param sequenceName sequenceName for this row's segment
* @param committerSupplier supplier of a committer associated with all data that has been added, including this row
*
* @return segment to which this row was added, or null if segment allocator returned null for this row
*
* @throws IOException if there is an I/O error while allocating or writing to a segment
*/
public AppenderatorDriverAddResult add(
final InputRow row,
final String sequenceName,
final Supplier<Committer> committerSupplier
) throws IOException
{
return add(row, sequenceName, committerSupplier, false);
return add(row, sequenceName, committerSupplier, false, true);
}
/**
* Add a row. Must not be called concurrently from multiple threads.
*
* @param row the row to add
* @param sequenceName sequenceName for this row's segment
* @param committerSupplier supplier of a committer associated with all data that has been added, including this row
* if {@param allowIncrementalPersists} is set to false then this will not be used
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
* @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period
* threshold is hit
*
* @return {@link AppenderatorDriverAddResult}
*
* @throws IOException if there is an I/O error while allocating or writing to a segment
*/
public AppenderatorDriverAddResult add(
final InputRow row,
final String sequenceName,
final Supplier<Committer> committerSupplier,
final boolean skipSegmentLineageCheck
final boolean skipSegmentLineageCheck,
final boolean allowIncrementalPersists
) throws IOException
{
Preconditions.checkNotNull(row, "row");
@ -285,8 +293,18 @@ public class AppenderatorDriver implements Closeable
if (identifier != null) {
try {
final int numRowsInMemory = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier));
return AppenderatorDriverAddResult.ok(identifier, numRowsInMemory, appenderator.getTotalRowCount());
final Appenderator.AppenderatorAddResult result = appenderator.add(
identifier,
row,
wrapCommitterSupplier(committerSupplier),
allowIncrementalPersists
);
return AppenderatorDriverAddResult.ok(
identifier,
result.getNumRowsInSegment(),
appenderator.getTotalRowCount(),
result.isPersistRequired()
);
}
catch (SegmentNotWritableException e) {
throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier);
@ -299,7 +317,7 @@ public class AppenderatorDriver implements Closeable
/**
* Persist all data indexed through this driver so far. Blocks until complete.
* <p>
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier)}.
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}.
*
* @param committer committer representing all data that has been added so far
*
@ -322,6 +340,22 @@ public class AppenderatorDriver implements Closeable
}
}
/**
* Persist all data indexed through this driver so far. Returns a future of persisted commitMetadata.
* <p>
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}.
*
* @param committer committer representing all data that has been added so far
*
* @return future containing commitMetadata persisted
*/
public ListenableFuture<Object> persistAsync(final Committer committer)
throws InterruptedException, ExecutionException
{
log.info("Persisting data asynchronously");
return appenderator.persistAll(wrapCommitter(committer));
}
/**
* Register the segments in the given {@link SegmentsAndMetadata} to be handed off and execute a background task which
* waits until the hand off completes.
@ -442,6 +476,8 @@ public class AppenderatorDriver implements Closeable
*
* @param row input row
* @param sequenceName sequenceName for potential segment allocation
* @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
*
* @return identifier, or null
*
@ -615,7 +651,7 @@ public class AppenderatorDriver implements Closeable
* Execute a task in background to publish the given segments. The task blocks until complete.
* Retries forever on transient failures, but may exit early on permanent failures.
* <p>
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier)}.
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}.
*
* @param publisher publisher to use for this set of segments
* @param wrappedCommitter committer representing all data that has been added so far

View File

@ -25,38 +25,43 @@ import io.druid.data.input.InputRow;
import javax.annotation.Nullable;
/**
* Result of {@link AppenderatorDriver#add(InputRow, String, Supplier)}. It contains the identifier of the
* segment which the InputRow is added to and the number of rows in that segment.
* Result of {@link AppenderatorDriver#add(InputRow, String, Supplier, boolean)}. It contains the identifier of the
* segment which the InputRow is added to, the number of rows in that segment and if persist is required because either
* maxRowsInMemory or intermediate persist period threshold is hit.
*/
public class AppenderatorDriverAddResult
{
private final SegmentIdentifier segmentIdentifier;
private final int numRowsInSegment;
private final long totalNumRowsInAppenderator;
private final boolean isPersistRequired;
public static AppenderatorDriverAddResult ok(
SegmentIdentifier segmentIdentifier,
int numRowsInSegment,
long totalNumRowsInAppenderator
long totalNumRowsInAppenderator,
boolean isPersistRequired
)
{
return new AppenderatorDriverAddResult(segmentIdentifier, numRowsInSegment, totalNumRowsInAppenderator);
return new AppenderatorDriverAddResult(segmentIdentifier, numRowsInSegment, totalNumRowsInAppenderator, isPersistRequired);
}
public static AppenderatorDriverAddResult fail()
{
return new AppenderatorDriverAddResult(null, 0, 0);
return new AppenderatorDriverAddResult(null, 0, 0, false);
}
private AppenderatorDriverAddResult(
@Nullable SegmentIdentifier segmentIdentifier,
int numRowsInSegment,
long totalNumRowsInAppenderator
long totalNumRowsInAppenderator,
boolean isPersistRequired
)
{
this.segmentIdentifier = segmentIdentifier;
this.numRowsInSegment = numRowsInSegment;
this.totalNumRowsInAppenderator = totalNumRowsInAppenderator;
this.isPersistRequired = isPersistRequired;
}
public boolean isOk()
@ -78,4 +83,9 @@ public class AppenderatorDriverAddResult
{
return totalNumRowsInAppenderator;
}
public boolean isPersistRequired()
{
return isPersistRequired;
}
}

View File

@ -199,10 +199,11 @@ public class AppenderatorImpl implements Appenderator
}
@Override
public int add(
public AppenderatorAddResult add(
final SegmentIdentifier identifier,
final InputRow row,
final Supplier<Committer> committerSupplier
final Supplier<Committer> committerSupplier,
final boolean allowIncrementalPersists
) throws IndexSizeExceededException, SegmentNotWritableException
{
if (!identifier.getDataSource().equals(schema.getDataSource())) {
@ -219,7 +220,7 @@ public class AppenderatorImpl implements Appenderator
final int sinkRowsInMemoryAfterAdd;
try {
sinkRowsInMemoryAfterAdd = sink.add(row);
sinkRowsInMemoryAfterAdd = sink.add(row, !allowIncrementalPersists);
}
catch (IndexSizeExceededException e) {
// Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we
@ -237,14 +238,19 @@ public class AppenderatorImpl implements Appenderator
rowsCurrentlyInMemory.addAndGet(numAddedRows);
totalRows.addAndGet(numAddedRows);
boolean isPersistRequired = false;
if (!sink.canAppendRow()
|| System.currentTimeMillis() > nextFlush
|| rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
if (allowIncrementalPersists) {
// persistAll clears rowsCurrentlyInMemory, no need to update it.
persistAll(committerSupplier.get());
} else {
isPersistRequired = true;
}
}
return sink.getNumRows();
return new AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired);
}
@Override

View File

@ -155,7 +155,7 @@ public class AppenderatorPlumber implements Plumber
final int numRows;
try {
numRows = appenderator.add(identifier, row, committerSupplier);
numRows = appenderator.add(identifier, row, committerSupplier).getNumRowsInSegment();
lastCommitterSupplier = committerSupplier;
return numRows;
}

View File

@ -31,6 +31,8 @@ public interface SegmentAllocator
* @param row the event which triggered this allocation request
* @param sequenceName sequenceName for this allocation
* @param previousSegmentId segment identifier returned on the previous call to allocate for your sequenceName
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
*
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
*/

View File

@ -228,7 +228,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
final List<InputRow> rows = Lists.newArrayList();
for (final Map<String, Object> event : events) {
// Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
rows.add(parser.parse(event));
rows.addAll(parser.parseBatch(event));
}
try {

View File

@ -21,6 +21,7 @@ package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.ircclouds.irc.api.Callback;
import com.ircclouds.irc.api.IRCApi;
@ -42,6 +43,7 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
@ -187,30 +189,36 @@ public class IrcFirehoseFactory implements FirehoseFactory<InputRowParser<Pair<D
return new Firehose()
{
InputRow nextRow = null;
Iterator<InputRow> nextIterator = Iterators.emptyIterator();
@Override
public boolean hasMore()
{
try {
while (true) {
Pair<DateTime, ChannelPrivMsg> nextMsg = queue.poll(100, TimeUnit.MILLISECONDS);
if (closed) {
return false;
}
if (nextIterator.hasNext()) {
nextRow = nextIterator.next();
if (nextRow != null) {
return true;
}
} else {
Pair<DateTime, ChannelPrivMsg> nextMsg = queue.poll(100, TimeUnit.MILLISECONDS);
if (nextMsg == null) {
continue;
}
try {
nextRow = firehoseParser.parse(nextMsg);
if (nextRow != null) {
return true;
}
nextIterator = firehoseParser.parseBatch(nextMsg).iterator();
}
catch (IllegalArgumentException iae) {
log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName());
}
}
}
}
catch (InterruptedException e) {
Thread.interrupted();
throw new RuntimeException("interrupted retrieving elements from queue", e);

View File

@ -22,15 +22,16 @@ package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableList;
import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.java.util.common.Pair;
import org.joda.time.DateTime;
import java.util.List;
/**
* <p><b>Example Usage</b></p>
* <p/>
@ -86,9 +87,9 @@ public class IrcInputRowParser implements InputRowParser<Pair<DateTime, ChannelP
}
@Override
public InputRow parse(Pair<DateTime, ChannelPrivMsg> msg)
public List<InputRow> parseBatch(Pair<DateTime, ChannelPrivMsg> msg)
{
return decoder.decodeMessage(msg.lhs, msg.rhs.getChannelName(), msg.rhs.getText());
return ImmutableList.of(decoder.decodeMessage(msg.lhs, msg.rhs.getChannelName(), msg.rhs.getText()));
}
@JsonProperty

View File

@ -216,7 +216,7 @@ public class RealtimePlumber implements Plumber
return -1;
}
final int numRows = sink.add(row);
final int numRows = sink.add(row, false);
if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
persist(committerSupplier.get());

View File

@ -26,7 +26,6 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
@ -140,7 +139,7 @@ public class Sink implements Iterable<FireHydrant>
return currHydrant;
}
public int add(InputRow row) throws IndexSizeExceededException
public int add(InputRow row, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException
{
if (currHydrant == null) {
throw new IAE("No currHydrant but given row[%s]", row);
@ -155,7 +154,7 @@ public class Sink implements Iterable<FireHydrant>
if (index == null) {
return ADD_FAILED; // the hydrant was swapped without being replaced
}
return index.add(row);
return index.add(row, skipMaxRowsInMemoryCheck);
}
}

View File

@ -169,9 +169,9 @@ public class DataSchemaTest
// Test hack that produces a StringInputRowParser.
final StringInputRowParser parser = (StringInputRowParser) schema.getParser();
final InputRow row1bb = parser.parse(
final InputRow row1bb = parser.parseBatch(
ByteBuffer.wrap("{\"time\":\"2000-01-01\",\"dimA\":\"foo\"}".getBytes(Charsets.UTF_8))
);
).get(0);
Assert.assertEquals(DateTimes.of("2000-01-01"), row1bb.getTimestamp());
Assert.assertEquals("foo", row1bb.getRaw("dimA"));
Assert.assertEquals("foofoo", row1bb.getRaw("expr"));
@ -181,9 +181,9 @@ public class DataSchemaTest
Assert.assertEquals("foo", row1string.getRaw("dimA"));
Assert.assertEquals("foofoo", row1string.getRaw("expr"));
final InputRow row2 = parser.parse(
final InputRow row2 = parser.parseBatch(
ByteBuffer.wrap("{\"time\":\"2000-01-01\",\"dimA\":\"x\"}".getBytes(Charsets.UTF_8))
);
).get(0);
Assert.assertNull(row2);
}

View File

@ -80,7 +80,7 @@ public class TransformSpecTest
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parse(ROW1);
final InputRow row = parser.parseBatch(ROW1).get(0);
Assert.assertNotNull(row);
Assert.assertEquals(DateTimes.of("2000-01-01").getMillis(), row.getTimestampFromEpoch());
@ -108,7 +108,7 @@ public class TransformSpecTest
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parse(ROW1);
final InputRow row = parser.parseBatch(ROW1).get(0);
Assert.assertNotNull(row);
Assert.assertEquals(DateTimes.of("2000-01-01").getMillis(), row.getTimestampFromEpoch());
@ -139,8 +139,8 @@ public class TransformSpecTest
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
Assert.assertNotNull(parser.parse(ROW1));
Assert.assertNull(parser.parse(ROW2));
Assert.assertNotNull(parser.parseBatch(ROW1).get(0));
Assert.assertNull(parser.parseBatch(ROW2).get(0));
}
@Test
@ -154,7 +154,7 @@ public class TransformSpecTest
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parse(ROW1);
final InputRow row = parser.parseBatch(ROW1).get(0);
Assert.assertNotNull(row);
Assert.assertEquals(DateTimes.of("1970-01-01T05:00:00Z"), row.getTimestamp());
@ -172,7 +172,7 @@ public class TransformSpecTest
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parse(ROW1);
final InputRow row = parser.parseBatch(ROW1).get(0);
Assert.assertNotNull(row);
Assert.assertEquals(DateTimes.of("2000-01-01T01:00:00Z"), row.getTimestamp());

View File

@ -1059,7 +1059,7 @@ public class RealtimeManagerTest
return -1;
}
return sink.add(row);
return sink.add(row, false);
}
public Sink getSink(long timestamp)

View File

@ -304,12 +304,13 @@ public class AppenderatorDriverFailTest
}
@Override
public int add(
SegmentIdentifier identifier, InputRow row, Supplier<Committer> committerSupplier
public AppenderatorAddResult add(
SegmentIdentifier identifier, InputRow row, Supplier<Committer> committerSupplier, boolean allowIncrementalPersists
) throws IndexSizeExceededException, SegmentNotWritableException
{
rows.computeIfAbsent(identifier, k -> new ArrayList<>()).add(row);
return ++numRows;
numRows++;
return new AppenderatorAddResult(identifier, numRows, false);
}
@Override

View File

@ -83,13 +83,22 @@ public class AppenderatorTest
// add
commitMetadata.put("x", "1");
Assert.assertEquals(1, appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier));
Assert.assertEquals(1,
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier)
.getNumRowsInSegment()
);
commitMetadata.put("x", "2");
Assert.assertEquals(2, appenderator.add(IDENTIFIERS.get(0), IR("2000", "bar", 2), committerSupplier));
Assert.assertEquals(2,
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bar", 2), committerSupplier)
.getNumRowsInSegment()
);
commitMetadata.put("x", "3");
Assert.assertEquals(1, appenderator.add(IDENTIFIERS.get(1), IR("2000", "qux", 4), committerSupplier));
Assert.assertEquals(1,
appenderator.add(IDENTIFIERS.get(1), IR("2000", "qux", 4), committerSupplier)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(IDENTIFIERS.subList(0, 2), sorted(appenderator.getSegments()));
@ -188,6 +197,52 @@ public class AppenderatorTest
}
}
@Test
public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(3, false)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of("eventCount", eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
// Do nothing
}
};
};
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier, false);
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier, false);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier, false);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "baz", 1), committerSupplier, false);
Assert.assertEquals(3, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "qux", 1), committerSupplier, false);
Assert.assertEquals(4, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier, false);
Assert.assertEquals(5, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get());
Assert.assertEquals(3, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
}
}
@Test
public void testRestoreFromDisk() throws Exception
{

View File

@ -129,7 +129,8 @@ public class SinkTest
{
return 0;
}
}
},
false
);
FireHydrant currHydrant = sink.getCurrHydrant();
@ -182,7 +183,8 @@ public class SinkTest
{
return 0;
}
}
},
false
);
Assert.assertEquals(currHydrant, swapHydrant);

View File

@ -526,18 +526,18 @@ public class CalciteTests
public static InputRow createRow(final ImmutableMap<String, ?> map)
{
return PARSER.parse((Map<String, Object>) map);
return PARSER.parseBatch((Map<String, Object>) map).get(0);
}
public static InputRow createRow(final Object t, final String dim1, final String dim2, final double m1)
{
return PARSER.parse(
return PARSER.parseBatch(
ImmutableMap.<String, Object>of(
"t", new DateTime(t, ISOChronology.getInstanceUTC()).getMillis(),
"dim1", dim1,
"dim2", dim2,
"m1", m1
)
);
).get(0);
}
}