Remove LegacyKafkaIndexTaskRunner (#7735)

This commit is contained in:
Jihoon Son 2019-05-23 09:25:35 -07:00 committed by Fangjin Yang
parent 3dec5cd1e4
commit eff2be4f8f
5 changed files with 19 additions and 1338 deletions

View File

@ -28,7 +28,6 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
@ -129,28 +128,15 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
@Override
protected SeekableStreamIndexTaskRunner<Integer, Long> createTaskRunner()
{
if (context != null && context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null
&& ((boolean) context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) {
//noinspection unchecked
return new IncrementalPublishingKafkaIndexTaskRunner(
this,
dataSchema.getParser(),
authorizerMapper,
chatHandlerProvider,
savedParseExceptions,
rowIngestionMetersFactory
);
} else {
//noinspection unchecked
return new LegacyKafkaIndexTaskRunner(
this,
dataSchema.getParser(),
authorizerMapper,
chatHandlerProvider,
savedParseExceptions,
rowIngestionMetersFactory
);
}
//noinspection unchecked
return new IncrementalPublishingKafkaIndexTaskRunner(
this,
dataSchema.getParser(),
authorizerMapper,
chatHandlerProvider,
savedParseExceptions,
rowIngestionMetersFactory
);
}
@Override

View File

@ -140,8 +140,6 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@ -165,8 +163,6 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
@ -191,7 +187,6 @@ import java.util.concurrent.TimeoutException;
import static org.apache.druid.query.QueryPlus.wrap;
@RunWith(Parameterized.class)
public class KafkaIndexTaskTest
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
@ -228,24 +223,10 @@ public class KafkaIndexTaskTest
private File directory;
private String topic;
private List<ProducerRecord<byte[], byte[]>> records;
private final boolean isIncrementalHandoffSupported;
private final Set<Integer> checkpointRequestsHash = new HashSet<>();
private File reportsFile;
private RowIngestionMetersFactory rowIngestionMetersFactory;
// This should be removed in versions greater that 0.12.x
// isIncrementalHandoffSupported should always be set to true in those later versions
@Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{true}, new Object[]{false});
}
public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported)
{
this.isIncrementalHandoffSupported = isIncrementalHandoffSupported;
}
private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
OBJECT_MAPPER.convertValue(
@ -502,9 +483,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testIncrementalHandOff() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
maxRowsPerSegment = 2;
@ -608,9 +586,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testIncrementalHandOffMaxTotalRows() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
final String baseSequenceName = "sequence0";
// incremental publish should happen every 3 records
maxRowsPerSegment = Integer.MAX_VALUE;
@ -763,9 +738,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testTimeBasedIncrementalHandOff() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
final String baseSequenceName = "sequence0";
// as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen
maxRowsPerSegment = Integer.MAX_VALUE;
@ -853,9 +825,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
records = generateSinglePartitionRecords(topic);
final String baseSequenceName = "sequence0";
@ -1675,9 +1644,7 @@ public class KafkaIndexTaskTest
// desc3 will not be created in KafkaIndexTask (0.12.x) as it does not create per Kafka partition Druid segments
SegmentDescriptor desc3 = sd(task, "2011/P1D", 1);
SegmentDescriptor desc4 = sd(task, "2012/P1D", 0);
Assert.assertEquals(isIncrementalHandoffSupported
? ImmutableSet.of(desc1, desc2, desc4)
: ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc4), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L, 1, 2L))
@ -1691,12 +1658,8 @@ public class KafkaIndexTaskTest
// Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically
Assert.assertEquals(
isIncrementalHandoffSupported
? ImmutableSet.of(ImmutableList.of("d", "e", "h"))
: ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")),
isIncrementalHandoffSupported
? ImmutableSet.of(readSegmentColumn("dim1", desc2))
: ImmutableSet.of(readSegmentColumn("dim1", desc2), readSegmentColumn("dim1", desc3))
ImmutableSet.of(ImmutableList.of("d", "e", "h")),
ImmutableSet.of(readSegmentColumn("dim1", desc2))
);
}
@ -1867,10 +1830,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testRestoreAfterPersistingSequences() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
records = generateSinglePartitionRecords(topic);
maxRowsPerSegment = 2;
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
@ -2136,12 +2095,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
{
// This tests the case when a replacement task is created in place of a failed test
// which has done some incremental handoffs, thus the context will contain starting
// sequence offsets from which the task should start reading and ignore the start offsets
if (!isIncrementalHandoffSupported) {
return;
}
// Insert data
insertData();
@ -2344,9 +2297,6 @@ public class KafkaIndexTaskTest
@Test(timeout = 60_000L)
public void testCanStartFromLaterThanEarliestOffset() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
final String baseSequenceName = "sequence0";
maxRowsPerSegment = Integer.MAX_VALUE;
maxTotalRows = null;
@ -2542,17 +2492,13 @@ public class KafkaIndexTaskTest
maxParseExceptions,
maxSavedParseExceptions
);
if (isIncrementalHandoffSupported) {
context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
final String checkpointsJson = OBJECT_MAPPER
.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
.writeValueAsString(checkpoints);
context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
}
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
final String checkpointsJson = OBJECT_MAPPER
.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
.writeValueAsString(checkpoints);
context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
}
final KafkaIndexTask task = new KafkaIndexTask(
@ -2736,14 +2682,6 @@ public class KafkaIndexTaskTest
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return new ArrayList<>();
}
};
toolboxFactory = new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,

View File

@ -2664,8 +2664,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
) throws JsonProcessingException
{
if (context != null) {
context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());

View File

@ -121,7 +121,6 @@ import java.util.stream.Stream;
*/
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> implements Supervisor
{
public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
@ -2752,7 +2751,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected Map<String, Object> createBaseTaskContexts()
{
final Map<String, Object> contexts = new HashMap<>();
contexts.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
if (spec.getContext() != null) {
contexts.putAll(spec.getContext());
}