From 82ce60b5c1756a3eda06e075e2071a11feee368c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 6 Apr 2020 16:36:08 -0700 Subject: [PATCH] Reuse transformer in stream indexing (#9625) * Reuse transformer in stream indexing * remove unused method * memoize complied pattern --- .../data/input/impl/RegexInputFormat.java | 29 ++++++- .../druid/data/input/impl/RegexReader.java | 7 +- .../data/input/impl/RegexInputFormatTest.java | 72 ++++++++++++++++ .../SeekableStreamIndexTaskRunner.java | 54 ++++-------- .../SettableByteEntityReader.java | 84 +++++++++++++++++++ .../seekablestream/StreamChunkParser.java | 81 ++++++++++++++++++ .../segment/transform/TransformSpec.java | 6 -- 7 files changed, 283 insertions(+), 50 deletions(-) create mode 100644 core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java index 116551669db..c3745bc277a 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java @@ -20,7 +20,10 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputFormat; @@ -29,12 +32,15 @@ import org.apache.druid.data.input.InputRowSchema; import javax.annotation.Nullable; import java.io.File; import java.util.List; +import java.util.regex.Pattern; public class RegexInputFormat implements InputFormat { private final String pattern; private final String listDelimiter; private final List columns; + @JsonIgnore + private final Supplier compiledPatternSupplier; @JsonCreator public RegexInputFormat( @@ -46,6 +52,27 @@ public class RegexInputFormat implements InputFormat this.pattern = pattern; this.listDelimiter = listDelimiter; this.columns = columns; + this.compiledPatternSupplier = Suppliers.memoize(() -> Pattern.compile(pattern)); + } + + @JsonProperty + public String getPattern() + { + return pattern; + } + + @Nullable + @JsonProperty + public String getListDelimiter() + { + return listDelimiter; + } + + @Nullable + @JsonProperty + public List getColumns() + { + return columns; } @Override @@ -57,6 +84,6 @@ public class RegexInputFormat implements InputFormat @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return new RegexReader(inputRowSchema, source, pattern, listDelimiter, columns); + return new RegexReader(inputRowSchema, source, pattern, compiledPatternSupplier.get(), listDelimiter, columns); } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java b/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java index 2962ebd0618..e29e1dfc521 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java @@ -42,7 +42,7 @@ import java.util.regex.Pattern; public class RegexReader extends TextReader { private final String pattern; - private final Pattern compiled; + private final Pattern compiledPattern; private final Function multiValueFunction; private List columns; @@ -51,13 +51,14 @@ public class RegexReader extends TextReader InputRowSchema inputRowSchema, InputEntity source, String pattern, + Pattern compiledPattern, @Nullable String listDelimiter, @Nullable List columns ) { super(inputRowSchema, source); this.pattern = pattern; - this.compiled = Pattern.compile(pattern); + this.compiledPattern = compiledPattern; final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); this.columns = columns; @@ -78,7 +79,7 @@ public class RegexReader extends TextReader private Map parseLine(String line) { try { - final Matcher matcher = compiled.matcher(line); + final Matcher matcher = compiledPattern.matcher(line); if (!matcher.matches()) { throw new ParseException("Incorrect Regex: %s . No match found.", pattern); diff --git a/core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java new file mode 100644 index 00000000000..9754a75ee29 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputFormat; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +public class RegexInputFormatTest +{ + private final ObjectMapper mapper; + + public RegexInputFormatTest() + { + mapper = new ObjectMapper(); + mapper.registerSubtypes(new NamedType(RegexInputFormat.class, "regex")); + } + + @Test + public void testSerde() throws IOException + { + final RegexInputFormat expected = new RegexInputFormat( + "//[^\\r\\n]*[\\r\\n]", + "|", + ImmutableList.of("col1", "col2", "col3") + ); + + final byte[] json = mapper.writeValueAsBytes(expected); + final RegexInputFormat fromJson = (RegexInputFormat) mapper.readValue(json, InputFormat.class); + + Assert.assertEquals(expected.getPattern(), fromJson.getPattern()); + Assert.assertEquals(expected.getListDelimiter(), fromJson.getListDelimiter()); + Assert.assertEquals(expected.getColumns(), fromJson.getColumns()); + } + + @Test + public void testIgnoreCompiledPatternInJson() throws IOException + { + final RegexInputFormat expected = new RegexInputFormat( + "//[^\\r\\n]*[\\r\\n]", + "|", + ImmutableList.of("col1", "col2", "col3") + ); + + final byte[] json = mapper.writeValueAsBytes(expected); + final Map map = mapper.readValue(json, Map.class); + Assert.assertFalse(map.containsKey("compiledPattern")); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 9a18322058b..1577833d5fd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -38,11 +38,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.data.input.Committer; -import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.LookupNodeService; @@ -74,7 +72,6 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.collect.Utils; -import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -206,6 +203,7 @@ public abstract class SeekableStreamIndexTaskRunner parser; private final AuthorizerMapper authorizerMapper; private final Optional chatHandlerProvider; @@ -364,48 +362,24 @@ public abstract class SeekableStreamIndexTaskRunner parseBytes(List valueBytess) throws IOException - { - if (parser != null) { - return parseWithParser(valueBytess); - } else { - return parseWithInputFormat(valueBytess); - } - } - - private List parseWithParser(List valueBytess) - { - final List rows = new ArrayList<>(); - for (byte[] valueBytes : valueBytess) { - rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes))); - } - return rows; - } - - private List parseWithInputFormat(List valueBytess) throws IOException - { - final List rows = new ArrayList<>(); - for (byte[] valueBytes : valueBytess) { - final InputEntityReader reader = task.getDataSchema().getTransformSpec().decorate( - Preconditions.checkNotNull(inputFormat, "inputFormat").createReader( - inputRowSchema, - new ByteEntity(valueBytes), - toolbox.getIndexingTmpDir() - ) - ); - try (CloseableIterator rowIterator = reader.read()) { - rowIterator.forEachRemaining(rows::add); - } - } - return rows; - } - private TaskStatus runInternal(TaskToolbox toolbox) throws Exception { startTime = DateTimes.nowUtc(); status = Status.STARTING; setToolbox(toolbox); + + // Now we can initialize StreamChunkReader with the given toolbox. + final StreamChunkParser parser = new StreamChunkParser( + this.parser, + new SettableByteEntityReader( + inputFormat, + inputRowSchema, + task.getDataSchema().getTransformSpec(), + toolbox.getIndexingTmpDir() + ) + ); + initializeSequences(); if (chatHandlerProvider.isPresent()) { @@ -657,7 +631,7 @@ public abstract class SeekableStreamIndexTaskRunner read() throws IOException + { + return delegate.read(); + } + + @Override + public CloseableIterator sample() throws IOException + { + return delegate.sample(); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java new file mode 100644 index 00000000000..3f9b28138d8 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.java.util.common.parsers.CloseableIterator; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader} + * or {@link InputRowParser}. This class will be useful untill we remove the deprecated InputRowParser. + */ +class StreamChunkParser +{ + @Nullable + private final InputRowParser parser; + private final SettableByteEntityReader byteEntityReader; + + StreamChunkParser(@Nullable InputRowParser parser, SettableByteEntityReader byteEntityReader) + { + this.parser = parser; + this.byteEntityReader = byteEntityReader; + } + + List parse(List streamChunk) throws IOException + { + if (parser != null) { + return parseWithParser(parser, streamChunk); + } else { + return parseWithInputFormat(byteEntityReader, streamChunk); + } + } + + private static List parseWithParser(InputRowParser parser, List valueBytess) + { + final List rows = new ArrayList<>(); + for (byte[] valueBytes : valueBytess) { + rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes))); + } + return rows; + } + + private static List parseWithInputFormat( + SettableByteEntityReader byteEntityReader, + List valueBytess + ) throws IOException + { + final List rows = new ArrayList<>(); + for (byte[] valueBytes : valueBytess) { + byteEntityReader.setEntity(new ByteEntity(valueBytes)); + try (CloseableIterator rowIterator = byteEntityReader.read()) { + rowIterator.forEachRemaining(rows::add); + } + } + return rows; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java index 3eae123d746..6de7ac9363c 100644 --- a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java @@ -22,7 +22,6 @@ package org.apache.druid.segment.transform; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser; @@ -118,11 +117,6 @@ public class TransformSpec return new TransformingInputSourceReader(reader, toTransformer()); } - public InputEntityReader decorate(InputEntityReader reader) - { - return new TransformingInputEntityReader(reader, toTransformer()); - } - /** * Create a {@link Transformer} from this TransformSpec, when the rows to be transformed do not have a known * signature.