mirror of https://github.com/apache/druid.git
Reuse transformer in stream indexing (#9625)
* Reuse transformer in stream indexing * remove unused method * memoize complied pattern
This commit is contained in:
parent
7bf1ebb0b8
commit
82ce60b5c1
|
@ -20,7 +20,10 @@
|
|||
package org.apache.druid.data.input.impl;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
import org.apache.druid.data.input.InputEntity;
|
||||
import org.apache.druid.data.input.InputEntityReader;
|
||||
import org.apache.druid.data.input.InputFormat;
|
||||
|
@ -29,12 +32,15 @@ import org.apache.druid.data.input.InputRowSchema;
|
|||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class RegexInputFormat implements InputFormat
|
||||
{
|
||||
private final String pattern;
|
||||
private final String listDelimiter;
|
||||
private final List<String> columns;
|
||||
@JsonIgnore
|
||||
private final Supplier<Pattern> compiledPatternSupplier;
|
||||
|
||||
@JsonCreator
|
||||
public RegexInputFormat(
|
||||
|
@ -46,6 +52,27 @@ public class RegexInputFormat implements InputFormat
|
|||
this.pattern = pattern;
|
||||
this.listDelimiter = listDelimiter;
|
||||
this.columns = columns;
|
||||
this.compiledPatternSupplier = Suppliers.memoize(() -> Pattern.compile(pattern));
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getPattern()
|
||||
{
|
||||
return pattern;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@JsonProperty
|
||||
public String getListDelimiter()
|
||||
{
|
||||
return listDelimiter;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@JsonProperty
|
||||
public List<String> getColumns()
|
||||
{
|
||||
return columns;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,6 +84,6 @@ public class RegexInputFormat implements InputFormat
|
|||
@Override
|
||||
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
|
||||
{
|
||||
return new RegexReader(inputRowSchema, source, pattern, listDelimiter, columns);
|
||||
return new RegexReader(inputRowSchema, source, pattern, compiledPatternSupplier.get(), listDelimiter, columns);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ import java.util.regex.Pattern;
|
|||
public class RegexReader extends TextReader
|
||||
{
|
||||
private final String pattern;
|
||||
private final Pattern compiled;
|
||||
private final Pattern compiledPattern;
|
||||
private final Function<String, Object> multiValueFunction;
|
||||
|
||||
private List<String> columns;
|
||||
|
@ -51,13 +51,14 @@ public class RegexReader extends TextReader
|
|||
InputRowSchema inputRowSchema,
|
||||
InputEntity source,
|
||||
String pattern,
|
||||
Pattern compiledPattern,
|
||||
@Nullable String listDelimiter,
|
||||
@Nullable List<String> columns
|
||||
)
|
||||
{
|
||||
super(inputRowSchema, source);
|
||||
this.pattern = pattern;
|
||||
this.compiled = Pattern.compile(pattern);
|
||||
this.compiledPattern = compiledPattern;
|
||||
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
|
||||
this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
|
||||
this.columns = columns;
|
||||
|
@ -78,7 +79,7 @@ public class RegexReader extends TextReader
|
|||
private Map<String, Object> parseLine(String line)
|
||||
{
|
||||
try {
|
||||
final Matcher matcher = compiled.matcher(line);
|
||||
final Matcher matcher = compiledPattern.matcher(line);
|
||||
|
||||
if (!matcher.matches()) {
|
||||
throw new ParseException("Incorrect Regex: %s . No match found.", pattern);
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.data.input.impl;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.data.input.InputFormat;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class RegexInputFormatTest
|
||||
{
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
public RegexInputFormatTest()
|
||||
{
|
||||
mapper = new ObjectMapper();
|
||||
mapper.registerSubtypes(new NamedType(RegexInputFormat.class, "regex"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerde() throws IOException
|
||||
{
|
||||
final RegexInputFormat expected = new RegexInputFormat(
|
||||
"//[^\\r\\n]*[\\r\\n]",
|
||||
"|",
|
||||
ImmutableList.of("col1", "col2", "col3")
|
||||
);
|
||||
|
||||
final byte[] json = mapper.writeValueAsBytes(expected);
|
||||
final RegexInputFormat fromJson = (RegexInputFormat) mapper.readValue(json, InputFormat.class);
|
||||
|
||||
Assert.assertEquals(expected.getPattern(), fromJson.getPattern());
|
||||
Assert.assertEquals(expected.getListDelimiter(), fromJson.getListDelimiter());
|
||||
Assert.assertEquals(expected.getColumns(), fromJson.getColumns());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreCompiledPatternInJson() throws IOException
|
||||
{
|
||||
final RegexInputFormat expected = new RegexInputFormat(
|
||||
"//[^\\r\\n]*[\\r\\n]",
|
||||
"|",
|
||||
ImmutableList.of("col1", "col2", "col3")
|
||||
);
|
||||
|
||||
final byte[] json = mapper.writeValueAsBytes(expected);
|
||||
final Map<String, Object> map = mapper.readValue(json, Map.class);
|
||||
Assert.assertFalse(map.containsKey("compiledPattern"));
|
||||
}
|
||||
}
|
|
@ -38,11 +38,9 @@ import com.google.common.util.concurrent.Futures;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import org.apache.druid.data.input.Committer;
|
||||
import org.apache.druid.data.input.InputEntityReader;
|
||||
import org.apache.druid.data.input.InputFormat;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.InputRowSchema;
|
||||
import org.apache.druid.data.input.impl.ByteEntity;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.discovery.DiscoveryDruidNode;
|
||||
import org.apache.druid.discovery.LookupNodeService;
|
||||
|
@ -74,7 +72,6 @@ import org.apache.druid.java.util.common.DateTimes;
|
|||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.collect.Utils;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.java.util.common.parsers.ParseException;
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
|
@ -206,6 +203,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
|
|||
private final SeekableStreamIndexTaskTuningConfig tuningConfig;
|
||||
private final InputRowSchema inputRowSchema;
|
||||
private final InputFormat inputFormat;
|
||||
@Nullable
|
||||
private final InputRowParser<ByteBuffer> parser;
|
||||
private final AuthorizerMapper authorizerMapper;
|
||||
private final Optional<ChatHandlerProvider> chatHandlerProvider;
|
||||
|
@ -364,48 +362,24 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
|
|||
log.info("Starting with sequences: %s", sequences);
|
||||
}
|
||||
|
||||
private List<InputRow> parseBytes(List<byte[]> valueBytess) throws IOException
|
||||
{
|
||||
if (parser != null) {
|
||||
return parseWithParser(valueBytess);
|
||||
} else {
|
||||
return parseWithInputFormat(valueBytess);
|
||||
}
|
||||
}
|
||||
|
||||
private List<InputRow> parseWithParser(List<byte[]> valueBytess)
|
||||
{
|
||||
final List<InputRow> rows = new ArrayList<>();
|
||||
for (byte[] valueBytes : valueBytess) {
|
||||
rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes)));
|
||||
}
|
||||
return rows;
|
||||
}
|
||||
|
||||
private List<InputRow> parseWithInputFormat(List<byte[]> valueBytess) throws IOException
|
||||
{
|
||||
final List<InputRow> rows = new ArrayList<>();
|
||||
for (byte[] valueBytes : valueBytess) {
|
||||
final InputEntityReader reader = task.getDataSchema().getTransformSpec().decorate(
|
||||
Preconditions.checkNotNull(inputFormat, "inputFormat").createReader(
|
||||
inputRowSchema,
|
||||
new ByteEntity(valueBytes),
|
||||
toolbox.getIndexingTmpDir()
|
||||
)
|
||||
);
|
||||
try (CloseableIterator<InputRow> rowIterator = reader.read()) {
|
||||
rowIterator.forEachRemaining(rows::add);
|
||||
}
|
||||
}
|
||||
return rows;
|
||||
}
|
||||
|
||||
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
startTime = DateTimes.nowUtc();
|
||||
status = Status.STARTING;
|
||||
|
||||
setToolbox(toolbox);
|
||||
|
||||
// Now we can initialize StreamChunkReader with the given toolbox.
|
||||
final StreamChunkParser parser = new StreamChunkParser(
|
||||
this.parser,
|
||||
new SettableByteEntityReader(
|
||||
inputFormat,
|
||||
inputRowSchema,
|
||||
task.getDataSchema().getTransformSpec(),
|
||||
toolbox.getIndexingTmpDir()
|
||||
)
|
||||
);
|
||||
|
||||
initializeSequences();
|
||||
|
||||
if (chatHandlerProvider.isPresent()) {
|
||||
|
@ -657,7 +631,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
|
|||
if (valueBytess == null || valueBytess.isEmpty()) {
|
||||
rows = Utils.nullableListOf((InputRow) null);
|
||||
} else {
|
||||
rows = parseBytes(valueBytess);
|
||||
rows = parser.parse(valueBytess);
|
||||
}
|
||||
boolean isPersistRequired = false;
|
||||
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.seekablestream;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.data.input.InputEntityReader;
|
||||
import org.apache.druid.data.input.InputFormat;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.InputRowListPlusRawValues;
|
||||
import org.apache.druid.data.input.InputRowSchema;
|
||||
import org.apache.druid.data.input.impl.ByteEntity;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.segment.transform.TransformSpec;
|
||||
import org.apache.druid.segment.transform.Transformer;
|
||||
import org.apache.druid.segment.transform.TransformingInputEntityReader;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A settable {@link InputEntityReader}. This class is intended to be used for only stream parsing in Kafka or Kinesis
|
||||
* indexing.
|
||||
*/
|
||||
class SettableByteEntityReader implements InputEntityReader
|
||||
{
|
||||
private final InputFormat inputFormat;
|
||||
private final InputRowSchema inputRowSchema;
|
||||
private final Transformer transformer;
|
||||
private final File indexingTmpDir;
|
||||
|
||||
private InputEntityReader delegate;
|
||||
|
||||
SettableByteEntityReader(
|
||||
InputFormat inputFormat,
|
||||
InputRowSchema inputRowSchema,
|
||||
TransformSpec transformSpec,
|
||||
File indexingTmpDir
|
||||
)
|
||||
{
|
||||
this.inputFormat = Preconditions.checkNotNull(inputFormat, "inputFormat");
|
||||
this.inputRowSchema = inputRowSchema;
|
||||
this.transformer = transformSpec.toTransformer();
|
||||
this.indexingTmpDir = indexingTmpDir;
|
||||
}
|
||||
|
||||
void setEntity(ByteEntity entity) throws IOException
|
||||
{
|
||||
this.delegate = new TransformingInputEntityReader(
|
||||
// Yes, we are creating a new reader for every stream chunk.
|
||||
// This should be fine as long as initializing a reader is cheap which it is for now.
|
||||
inputFormat.createReader(inputRowSchema, entity, indexingTmpDir),
|
||||
transformer
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableIterator<InputRow> read() throws IOException
|
||||
{
|
||||
return delegate.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
|
||||
{
|
||||
return delegate.sample();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.seekablestream;
|
||||
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.impl.ByteEntity;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader}
|
||||
* or {@link InputRowParser}. This class will be useful untill we remove the deprecated InputRowParser.
|
||||
*/
|
||||
class StreamChunkParser
|
||||
{
|
||||
@Nullable
|
||||
private final InputRowParser<ByteBuffer> parser;
|
||||
private final SettableByteEntityReader byteEntityReader;
|
||||
|
||||
StreamChunkParser(@Nullable InputRowParser<ByteBuffer> parser, SettableByteEntityReader byteEntityReader)
|
||||
{
|
||||
this.parser = parser;
|
||||
this.byteEntityReader = byteEntityReader;
|
||||
}
|
||||
|
||||
List<InputRow> parse(List<byte[]> streamChunk) throws IOException
|
||||
{
|
||||
if (parser != null) {
|
||||
return parseWithParser(parser, streamChunk);
|
||||
} else {
|
||||
return parseWithInputFormat(byteEntityReader, streamChunk);
|
||||
}
|
||||
}
|
||||
|
||||
private static List<InputRow> parseWithParser(InputRowParser<ByteBuffer> parser, List<byte[]> valueBytess)
|
||||
{
|
||||
final List<InputRow> rows = new ArrayList<>();
|
||||
for (byte[] valueBytes : valueBytess) {
|
||||
rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes)));
|
||||
}
|
||||
return rows;
|
||||
}
|
||||
|
||||
private static List<InputRow> parseWithInputFormat(
|
||||
SettableByteEntityReader byteEntityReader,
|
||||
List<byte[]> valueBytess
|
||||
) throws IOException
|
||||
{
|
||||
final List<InputRow> rows = new ArrayList<>();
|
||||
for (byte[] valueBytes : valueBytess) {
|
||||
byteEntityReader.setEntity(new ByteEntity(valueBytes));
|
||||
try (CloseableIterator<InputRow> rowIterator = byteEntityReader.read()) {
|
||||
rowIterator.forEachRemaining(rows::add);
|
||||
}
|
||||
}
|
||||
return rows;
|
||||
}
|
||||
}
|
|
@ -22,7 +22,6 @@ package org.apache.druid.segment.transform;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.data.input.InputEntityReader;
|
||||
import org.apache.druid.data.input.InputSourceReader;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||
|
@ -118,11 +117,6 @@ public class TransformSpec
|
|||
return new TransformingInputSourceReader(reader, toTransformer());
|
||||
}
|
||||
|
||||
public InputEntityReader decorate(InputEntityReader reader)
|
||||
{
|
||||
return new TransformingInputEntityReader(reader, toTransformer());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link Transformer} from this TransformSpec, when the rows to be transformed do not have a known
|
||||
* signature.
|
||||
|
|
Loading…
Reference in New Issue