mirror of https://github.com/apache/druid.git
Clarify InputSource.isSplittable usage. (#9424)
Also removes TimedShutoffInputSource, which had a bug in isSplittable (it improperly returned true, even though it didn't implement SplittableInputSource). This bug had no user-visible impact, since the code wasn't used.
This commit is contained in:
parent
5d05b40e6d
commit
ae617bf5dd
|
@ -55,7 +55,8 @@ import java.io.File;
|
||||||
public interface InputSource
|
public interface InputSource
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Returns true if this inputSource can be processed in parallel using ParallelIndexSupervisorTask.
|
* Returns true if this inputSource can be processed in parallel using ParallelIndexSupervisorTask. It must be
|
||||||
|
* castable to SplittableInputSource and the various SplittableInputSource methods must work as documented.
|
||||||
*/
|
*/
|
||||||
boolean isSplittable();
|
boolean isSplittable();
|
||||||
|
|
||||||
|
|
|
@ -1,86 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.druid.data.input.impl;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
||||||
import org.apache.druid.data.input.InputFormat;
|
|
||||||
import org.apache.druid.data.input.InputRowSchema;
|
|
||||||
import org.apache.druid.data.input.InputSource;
|
|
||||||
import org.apache.druid.data.input.InputSourceReader;
|
|
||||||
import org.joda.time.DateTime;
|
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import java.io.File;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A wrapping InputSource that will close the underlying InputSource at {@link #shutoffTime}.
|
|
||||||
* This InputSource is supposed to be used only for InputSourceSampler.
|
|
||||||
*/
|
|
||||||
public class TimedShutoffInputSource implements InputSource
|
|
||||||
{
|
|
||||||
private final InputSource delegate;
|
|
||||||
private final DateTime shutoffTime;
|
|
||||||
|
|
||||||
public TimedShutoffInputSource(
|
|
||||||
@JsonProperty("delegate") InputSource delegate,
|
|
||||||
@JsonProperty("shutoffTime") DateTime shutoffTime
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.delegate = delegate;
|
|
||||||
this.shutoffTime = shutoffTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
public InputSource getDelegate()
|
|
||||||
{
|
|
||||||
return delegate;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
public DateTime getShutoffTime()
|
|
||||||
{
|
|
||||||
return shutoffTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isSplittable()
|
|
||||||
{
|
|
||||||
return delegate.isSplittable();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean needsFormat()
|
|
||||||
{
|
|
||||||
return delegate.needsFormat();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InputSourceReader reader(
|
|
||||||
InputRowSchema inputRowSchema,
|
|
||||||
@Nullable InputFormat inputFormat,
|
|
||||||
File temporaryDirectory
|
|
||||||
)
|
|
||||||
{
|
|
||||||
return new TimedShutoffInputSourceReader(
|
|
||||||
delegate.reader(inputRowSchema, inputFormat, temporaryDirectory),
|
|
||||||
shutoffTime
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,57 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.druid.data.input.impl;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import org.apache.druid.data.input.InputFormat;
|
|
||||||
import org.apache.druid.data.input.InputRowListPlusRawValues;
|
|
||||||
import org.apache.druid.data.input.InputRowSchema;
|
|
||||||
import org.apache.druid.data.input.InputSource;
|
|
||||||
import org.apache.druid.data.input.InputSourceReader;
|
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
|
||||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collections;
|
|
||||||
|
|
||||||
public class TimedShutoffInputSourceTest
|
|
||||||
{
|
|
||||||
@Test
|
|
||||||
public void testTimeoutShutoff() throws IOException, InterruptedException
|
|
||||||
{
|
|
||||||
final int timeoutMs = 2000;
|
|
||||||
final InputSource inputSource = new TimedShutoffInputSource(
|
|
||||||
new InlineInputSource("this,is,test\nthis,data,has\n3,rows,\n"),
|
|
||||||
DateTimes.nowUtc().plusMillis(timeoutMs)
|
|
||||||
);
|
|
||||||
final InputFormat inputFormat = new CsvInputFormat(ImmutableList.of("col1", "col2", "col3"), null, null, false, 0);
|
|
||||||
final InputSourceReader reader = inputSource.reader(
|
|
||||||
new InputRowSchema(new TimestampSpec(null, null, null), new DimensionsSpec(null), Collections.emptyList()),
|
|
||||||
inputFormat,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
|
|
||||||
Thread.sleep(timeoutMs + 1000);
|
|
||||||
Assert.assertFalse(iterator.hasNext());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue