From ae617bf5dd47a76560f842e1356949979ec27f3d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 26 Feb 2020 22:05:47 -0800 Subject: [PATCH] Clarify InputSource.isSplittable usage. (#9424) Also removes TimedShutoffInputSource, which had a bug in isSplittable (it improperly returned true, even though it didn't implement SplittableInputSource). This bug had no user-visible impact, since the code wasn't used. --- .../apache/druid/data/input/InputSource.java | 3 +- .../input/impl/TimedShutoffInputSource.java | 86 ------------------- .../impl/TimedShutoffInputSourceTest.java | 57 ------------ 3 files changed, 2 insertions(+), 144 deletions(-) delete mode 100644 core/src/main/java/org/apache/druid/data/input/impl/TimedShutoffInputSource.java delete mode 100644 core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/InputSource.java b/core/src/main/java/org/apache/druid/data/input/InputSource.java index 8932c857928..b0144c51eef 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/InputSource.java @@ -55,7 +55,8 @@ import java.io.File; public interface InputSource { /** - * Returns true if this inputSource can be processed in parallel using ParallelIndexSupervisorTask. + * Returns true if this inputSource can be processed in parallel using ParallelIndexSupervisorTask. It must be + * castable to SplittableInputSource and the various SplittableInputSource methods must work as documented. */ boolean isSplittable(); diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TimedShutoffInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/TimedShutoffInputSource.java deleted file mode 100644 index fd944e5835c..00000000000 --- a/core/src/main/java/org/apache/druid/data/input/impl/TimedShutoffInputSource.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.data.input.impl; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.InputSource; -import org.apache.druid.data.input.InputSourceReader; -import org.joda.time.DateTime; - -import javax.annotation.Nullable; -import java.io.File; - -/** - * A wrapping InputSource that will close the underlying InputSource at {@link #shutoffTime}. - * This InputSource is supposed to be used only for InputSourceSampler. - */ -public class TimedShutoffInputSource implements InputSource -{ - private final InputSource delegate; - private final DateTime shutoffTime; - - public TimedShutoffInputSource( - @JsonProperty("delegate") InputSource delegate, - @JsonProperty("shutoffTime") DateTime shutoffTime - ) - { - this.delegate = delegate; - this.shutoffTime = shutoffTime; - } - - @JsonProperty - public InputSource getDelegate() - { - return delegate; - } - - @JsonProperty - public DateTime getShutoffTime() - { - return shutoffTime; - } - - @Override - public boolean isSplittable() - { - return delegate.isSplittable(); - } - - @Override - public boolean needsFormat() - { - return delegate.needsFormat(); - } - - @Override - public InputSourceReader reader( - InputRowSchema inputRowSchema, - @Nullable InputFormat inputFormat, - File temporaryDirectory - ) - { - return new TimedShutoffInputSourceReader( - delegate.reader(inputRowSchema, inputFormat, temporaryDirectory), - shutoffTime - ); - } -} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java deleted file mode 100644 index 2b4b2fc860a..00000000000 --- a/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.data.input.impl; - -import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputRowListPlusRawValues; -import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.InputSource; -import org.apache.druid.data.input.InputSourceReader; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; - -public class TimedShutoffInputSourceTest -{ - @Test - public void testTimeoutShutoff() throws IOException, InterruptedException - { - final int timeoutMs = 2000; - final InputSource inputSource = new TimedShutoffInputSource( - new InlineInputSource("this,is,test\nthis,data,has\n3,rows,\n"), - DateTimes.nowUtc().plusMillis(timeoutMs) - ); - final InputFormat inputFormat = new CsvInputFormat(ImmutableList.of("col1", "col2", "col3"), null, null, false, 0); - final InputSourceReader reader = inputSource.reader( - new InputRowSchema(new TimestampSpec(null, null, null), new DimensionsSpec(null), Collections.emptyList()), - inputFormat, - null - ); - try (CloseableIterator iterator = reader.sample()) { - Thread.sleep(timeoutMs + 1000); - Assert.assertFalse(iterator.hasNext()); - } - } -}