Add IS_INCREMENTAL_HANDOFF_SUPPORTED for KIS backward compatibility (#8050)

* Add IS_INCREMENTAL_HANDOFF_SUPPORTED for KIS backward compatibility

* do it for kafka only

* fix test
This commit is contained in:
Jihoon Son 2019-07-10 08:29:37 -07:00 committed by Fangjin Yang
parent 4e3314f675
commit fcf56f2330
4 changed files with 53 additions and 2 deletions

View File

@ -240,6 +240,10 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);
// Kafka index task always uses incremental handoff since 0.16.0.
// The below is for the compatibility when you want to downgrade your cluster to something earlier than 0.16.0.
// Kafka index task will pick up LegacyKafkaIndexTaskRunner without the below configuration.
context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);
List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@ -255,6 +256,52 @@ public class KafkaSupervisorTest extends EasyMockSupport
zkUtils = null;
}
@Test
public void testCreateBaseTaskContexts() throws JsonProcessingException
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final Map<String, Object> contexts = supervisor.createIndexTasks(
1,
"seq",
objectMapper,
new TreeMap<>(),
new KafkaIndexTaskIOConfig(
0,
"seq",
new SeekableStreamStartSequenceNumbers<>("test", Collections.emptyMap(), Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>("test", Collections.emptyMap()),
Collections.emptyMap(),
null,
null,
null,
null
),
new KafkaIndexTaskTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null
).get(0).getContext();
final Boolean contextValue = (Boolean) contexts.get("IS_INCREMENTAL_HANDOFF_SUPPORTED");
Assert.assertNotNull(contextValue);
Assert.assertTrue(contextValue);
}
@Test
public void testNoInitialState() throws Exception
{

View File

@ -202,7 +202,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor = null;
}
@Test
public void testNoInitialState() throws Exception
{

View File

@ -2799,7 +2799,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
protected Map<String, Object> createBaseTaskContexts()
@VisibleForTesting
public Map<String, Object> createBaseTaskContexts()
{
final Map<String, Object> contexts = new HashMap<>();
if (spec.getContext() != null) {