Use earliest offset on kafka newly discovered partitions (#8748)

* Use earliest offset on kafka newly discovered partitions

* resolve conflicts

* remove redundant check cases

* simplified unit tests

* change test case

* rewrite comments

* add regression test

* add junit ignore annotation

* minor modifications

* indent

* override testableKafkaSupervisor and KafkaRecordSupplier to make the test runable

* modified test constructor of kafkaRecordSupplier

* simplify

* delegated constructor
This commit is contained in:
Rye 2019-11-18 11:05:31 -08:00 committed by Jonathan Wei
parent 80fc04be71
commit ea8e4066f6
4 changed files with 248 additions and 42 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.kafka; package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
@ -60,10 +61,20 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
Map<String, Object> consumerProperties, Map<String, Object> consumerProperties,
ObjectMapper sortingMapper ObjectMapper sortingMapper
) )
{
this(consumerProperties, sortingMapper, getKafkaConsumer(sortingMapper, consumerProperties));
}
@VisibleForTesting
public KafkaRecordSupplier(
Map<String, Object> consumerProperties,
ObjectMapper sortingMapper,
KafkaConsumer<byte[], byte[]> consumer
)
{ {
this.consumerProperties = consumerProperties; this.consumerProperties = consumerProperties;
this.sortingMapper = sortingMapper; this.sortingMapper = sortingMapper;
this.consumer = getKafkaConsumer(); this.consumer = consumer;
} }
@Override @Override
@ -201,11 +212,14 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
} }
} }
private Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey) private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey)
{ {
Deserializer deserializerObject; Deserializer deserializerObject;
try { try {
Class deserializerClass = Class.forName(properties.getProperty(kafkaConfigKey, ByteArrayDeserializer.class.getTypeName())); Class deserializerClass = Class.forName(properties.getProperty(
kafkaConfigKey,
ByteArrayDeserializer.class.getTypeName()
));
Method deserializerMethod = deserializerClass.getMethod("deserialize", String.class, byte[].class); Method deserializerMethod = deserializerClass.getMethod("deserialize", String.class, byte[].class);
Type deserializerReturnType = deserializerMethod.getGenericReturnType(); Type deserializerReturnType = deserializerMethod.getGenericReturnType();
@ -213,7 +227,9 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
if (deserializerReturnType == byte[].class) { if (deserializerReturnType == byte[].class) {
deserializerObject = (Deserializer) deserializerClass.getConstructor().newInstance(); deserializerObject = (Deserializer) deserializerClass.getConstructor().newInstance();
} else { } else {
throw new IllegalArgumentException("Kafka deserializers must return a byte array (byte[]), " + deserializerClass.getName() + " returns " + deserializerReturnType.getTypeName()); throw new IllegalArgumentException("Kafka deserializers must return a byte array (byte[]), " +
deserializerClass.getName() + " returns " +
deserializerReturnType.getTypeName());
} }
} }
catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
@ -222,7 +238,7 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
return deserializerObject; return deserializerObject;
} }
private KafkaConsumer<byte[], byte[]> getKafkaConsumer() private static KafkaConsumer<byte[], byte[]> getKafkaConsumer(ObjectMapper sortingMapper, Map<String, Object> consumerProperties)
{ {
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties(); final Properties props = new Properties();
@ -231,7 +247,7 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try { try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); Thread.currentThread().setContextClassLoader(KafkaRecordSupplier.class.getClassLoader());
Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer");
Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer");

View File

@ -156,9 +156,13 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
@Override @Override
protected int getTaskGroupIdForPartition(Integer partition) protected int getTaskGroupIdForPartition(Integer partitionId)
{ {
return partition % spec.getIoConfig().getTaskCount(); // record partitionIds so that supervisor knows when a partition is discovered.
if (!partitionIds.contains(partitionId)) {
partitionIds.add(partitionId);
}
return partitionId % spec.getIoConfig().getTaskCount();
} }
@Override @Override

View File

@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import kafka.admin.AdminUtils; import kafka.admin.AdminUtils;
import kafka.admin.BrokerMetadata;
import kafka.admin.RackAwareMode; import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils; import kafka.utils.ZkUtils;
import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingCluster;
@ -51,6 +52,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskClient;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig; import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig; import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -67,6 +69,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData; import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
@ -86,9 +89,12 @@ import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppendera
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.easymock.Capture; import org.easymock.Capture;
import org.easymock.CaptureType; import org.easymock.CaptureType;
import org.easymock.EasyMock; import org.easymock.EasyMock;
@ -105,6 +111,8 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import scala.Option;
import scala.collection.Seq;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -585,6 +593,103 @@ public class KafkaSupervisorTest extends EasyMockSupport
); );
} }
/**
* Test if partitionIds get updated
*/
@Test
public void testPartitionIdsUpdates() throws Exception
{
supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null);
addSomeEvents(1100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
Assert.assertFalse(supervisor.isPartitionIdsEmpty());
}
@Test
public void testAlwaysUsesEarliestOffsetForNewlyDiscoveredPartitions() throws Exception
{
supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null);
addSomeEvents(9);
Capture<KafkaIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
KafkaIndexTask task = captured.getValue();
Assert.assertEquals(
10,
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0).longValue()
);
Assert.assertEquals(
10,
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1).longValue()
);
Assert.assertEquals(
10,
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2).longValue()
);
addMoreEvents(9, 6);
EasyMock.reset(taskQueue, taskStorage);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
Capture<KafkaIndexTask> tmp = Capture.newInstance();
EasyMock.expect(taskQueue.add(EasyMock.capture(tmp))).andReturn(true);
EasyMock.replay(taskStorage, taskQueue);
supervisor.runInternal();
verifyAll();
EasyMock.reset(taskQueue, taskStorage);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
Capture<KafkaIndexTask> newcaptured = Capture.newInstance();
EasyMock.expect(taskQueue.add(EasyMock.capture(newcaptured))).andReturn(true);
EasyMock.replay(taskStorage, taskQueue);
supervisor.runInternal();
verifyAll();
//check if start from earliest offset
task = newcaptured.getValue();
Assert.assertEquals(
0,
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(3).longValue()
);
Assert.assertEquals(
0,
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(4).longValue()
);
Assert.assertEquals(
0,
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(5).longValue()
);
}
/** /**
* Test generating the starting offsets from the partition data stored in druid_dataSource which contains the * Test generating the starting offsets from the partition data stored in druid_dataSource which contains the
* offsets of the last built segments. * offsets of the last built segments.
@ -640,7 +745,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
// because the stream's earliest offset is 0, although that would not happen in real usage. // because the stream's earliest offset is 0, although that would not happen in real usage.
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata( new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, -10L, 1, -20L, 2, -30L), ImmutableSet.of()) new SeekableStreamStartSequenceNumbers<>(
topic,
ImmutableMap.of(0, -10L, 1, -20L, 2, -30L),
ImmutableSet.of()
)
) )
).anyTimes(); ).anyTimes();
replayAll(); replayAll();
@ -973,7 +1082,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
.andReturn(Futures.immediateFuture(checkpoints)) .andReturn(Futures.immediateFuture(checkpoints))
.times(1); .times(1);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(captured.getValue()))
.anyTimes();
EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId())) EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId()))
.andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
EasyMock.expect(taskStorage.getStatus(runningTaskId)) EasyMock.expect(taskStorage.getStatus(runningTaskId))
@ -1477,7 +1588,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
@ -1865,7 +1978,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
@ -2130,7 +2245,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
@ -2228,7 +2345,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
@ -2338,7 +2457,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
@ -2445,7 +2566,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
@ -2576,7 +2699,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
@ -3135,6 +3260,46 @@ public class KafkaSupervisorTest extends EasyMockSupport
} }
} }
private void addMoreEvents(int numEventsPerPartition, int num_partitions) throws Exception
{
Seq<BrokerMetadata> brokerList = AdminUtils.getBrokerMetadatas(
zkUtils,
RackAwareMode.Enforced$.MODULE$,
Option.apply(zkUtils.getSortedBrokerList())
);
scala.collection.Map<Object, Seq<Object>> replicaAssignment = AdminUtils.assignReplicasToBrokers(
brokerList,
num_partitions,
1, 0, 0
);
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(
zkUtils,
topic,
replicaAssignment,
new Properties(),
true
);
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (int i = NUM_PARTITIONS; i < num_partitions; i++) {
for (int j = 0; j < numEventsPerPartition; j++) {
kafkaProducer.send(
new ProducerRecord<>(
topic,
i,
null,
StringUtils.toUtf8(StringUtils.format("event-%d", j))
)
).get();
}
}
kafkaProducer.commitTransaction();
}
}
private TestableKafkaSupervisor getTestableSupervisor( private TestableKafkaSupervisor getTestableSupervisor(
int replicas, int replicas,
int taskCount, int taskCount,
@ -3652,6 +3817,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static class TestableKafkaSupervisor extends KafkaSupervisor private static class TestableKafkaSupervisor extends KafkaSupervisor
{ {
private final Map<String, Object> consumerProperties;
public TestableKafkaSupervisor( public TestableKafkaSupervisor(
TaskStorage taskStorage, TaskStorage taskStorage,
TaskMaster taskMaster, TaskMaster taskMaster,
@ -3671,6 +3838,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
spec, spec,
rowIngestionMetersFactory rowIngestionMetersFactory
); );
this.consumerProperties = spec.getIoConfig().getConsumerProperties();
}
@Override
protected RecordSupplier<Integer, Long> setupRecordSupplier()
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
consumerConfigs.put("metadata.max.age.ms", "1");
final Properties props = new Properties();
KafkaRecordSupplier.addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
props.putAll(consumerConfigs);
Deserializer keyDeserializerObject = new ByteArrayDeserializer();
Deserializer valueDeserializerObject = new ByteArrayDeserializer();
return new KafkaRecordSupplier(
consumerProperties,
sortingMapper,
new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject)
);
} }
@Override @Override

View File

@ -511,7 +511,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.exec = Execs.singleThreaded(supervisorId); this.exec = Execs.singleThreaded(supervisorId);
this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + "-Scheduler-%d"); this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + "-Scheduler-%d");
this.reportingExec = Execs.scheduledSingleThreaded(supervisorId + "-Reporting-%d"); this.reportingExec = Execs.scheduledSingleThreaded(supervisorId + "-Reporting-%d");
this.stateManager = new SeekableStreamSupervisorStateManager(spec.getSupervisorStateManagerConfig(), spec.isSuspended()); this.stateManager = new SeekableStreamSupervisorStateManager(
spec.getSupervisorStateManagerConfig(),
spec.isSuspended()
);
int workerThreads = (this.tuningConfig.getWorkerThreads() != null int workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads() ? this.tuningConfig.getWorkerThreads()
@ -898,7 +901,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* Collect row ingestion stats from all tasks managed by this supervisor. * Collect row ingestion stats from all tasks managed by this supervisor.
* *
* @return A map of groupId->taskId->task row stats * @return A map of groupId->taskId->task row stats
*
* @throws InterruptedException * @throws InterruptedException
* @throws ExecutionException * @throws ExecutionException
* @throws TimeoutException * @throws TimeoutException
@ -1885,7 +1887,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.info("partition [%s] is closed and has no more data, skipping.", partitionId); log.info("partition [%s] is closed and has no more data, skipping.", partitionId);
continue; continue;
} }
if (!initialPartitionDiscovery && !this.partitionIds.contains(partitionId)) { if (!initialPartitionDiscovery && !this.partitionIds.contains(partitionId)) {
subsequentlyDiscoveredPartitions.add(partitionId); subsequentlyDiscoveredPartitions.add(partitionId);
} }
@ -1948,15 +1949,15 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/** /**
* This method determines the set of expired partitions from the set of partitions currently returned by * This method determines the set of expired partitions from the set of partitions currently returned by
* the record supplier and the set of partitions previously tracked in the metadata. * the record supplier and the set of partitions previously tracked in the metadata.
* * <p>
* It will mark the expired partitions in metadata and recompute the partition->task group mappings, updating * It will mark the expired partitions in metadata and recompute the partition->task group mappings, updating
* the metadata, the partitionIds list, and the partitionGroups mappings. * the metadata, the partitionIds list, and the partitionGroups mappings.
* * <p>
* Note that partition IDs that were newly discovered (appears in record supplier set but not in metadata set) * Note that partition IDs that were newly discovered (appears in record supplier set but not in metadata set)
* are not added to the recomputed partition groups here. This is handled later in * are not added to the recomputed partition groups here. This is handled later in
* {@link #updatePartitionDataFromStream} after this method is called. * {@link #updatePartitionDataFromStream} after this method is called.
* *
* @param storedPartitions Set of partitions previously tracked, from the metadata store * @param storedPartitions Set of partitions previously tracked, from the metadata store
* @param partitionIdsFromSupplier Set of partitions currently returned by the record supplier. * @param partitionIdsFromSupplier Set of partitions currently returned by the record supplier.
*/ */
private void cleanupExpiredPartitions( private void cleanupExpiredPartitions(
@ -2021,10 +2022,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/** /**
* When partitions are removed due to expiration it may be necessary to recompute the partitionID -> groupID * When partitions are removed due to expiration it may be necessary to recompute the partitionID -> groupID
* mappings to ensure balanced distribution of partitions. * mappings to ensure balanced distribution of partitions.
* * <p>
* This function should return a copy of partitionGroups, using the provided availablePartitions as the list of * This function should return a copy of partitionGroups, using the provided availablePartitions as the list of
* active partitions, reassigning partitions to different groups if necessary. * active partitions, reassigning partitions to different groups if necessary.
* * <p>
* If a partition is not in availablePartitions, it should be filtered out of the new partition groups returned * If a partition is not in availablePartitions, it should be filtered out of the new partition groups returned
* by this method. * by this method.
* *
@ -2039,12 +2040,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
} }
/** /**
*
* Some seekable stream systems such as Kinesis allow partitions to expire. When this occurs, the supervisor should * Some seekable stream systems such as Kinesis allow partitions to expire. When this occurs, the supervisor should
* mark the expired partitions in the saved metadata. This method returns a copy of the current metadata * mark the expired partitions in the saved metadata. This method returns a copy of the current metadata
* with any expired partitions marked with an implementation-specific offset value that represents the expired state. * with any expired partitions marked with an implementation-specific offset value that represents the expired state.
* *
* @param currentMetadata The current DataSourceMetadata from metadata storage * @param currentMetadata The current DataSourceMetadata from metadata storage
* @param expiredPartitionIds The set of expired partition IDs. * @param expiredPartitionIds The set of expired partition IDs.
* @return currentMetadata but with any expired partitions removed. * @return currentMetadata but with any expired partitions removed.
*/ */
@ -2059,12 +2059,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/** /**
* Perform a sanity check on the datasource metadata returned by * Perform a sanity check on the datasource metadata returned by
* {@link #createDataSourceMetadataWithExpiredPartitions}. * {@link #createDataSourceMetadataWithExpiredPartitions}.
* * <p>
* Specifically, we check that the cleaned metadata's partitions are a subset of the original metadata's partitions, * Specifically, we check that the cleaned metadata's partitions are a subset of the original metadata's partitions,
* that newly expired partitions are marked as expired, and that none of the offsets for the non-expired partitions * that newly expired partitions are marked as expired, and that none of the offsets for the non-expired partitions
* have changed. * have changed.
* *
* @param oldMetadata metadata containing expired partitions. * @param oldMetadata metadata containing expired partitions.
* @param cleanedMetadata new metadata without expired partitions, generated by the subclass * @param cleanedMetadata new metadata without expired partitions, generated by the subclass
*/ */
private void validateMetadataPartitionExpiration( private void validateMetadataPartitionExpiration(
@ -2074,10 +2074,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
) )
{ {
Map<PartitionIdType, SequenceOffsetType> oldPartitionSeqNos = oldMetadata.getSeekableStreamSequenceNumbers() Map<PartitionIdType, SequenceOffsetType> oldPartitionSeqNos = oldMetadata.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap(); .getPartitionSequenceNumberMap();
Map<PartitionIdType, SequenceOffsetType> cleanedPartitionSeqNos = cleanedMetadata.getSeekableStreamSequenceNumbers() Map<PartitionIdType, SequenceOffsetType> cleanedPartitionSeqNos = cleanedMetadata.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap(); .getPartitionSequenceNumberMap();
for (Entry<PartitionIdType, SequenceOffsetType> cleanedPartitionSeqNo : cleanedPartitionSeqNos.entrySet()) { for (Entry<PartitionIdType, SequenceOffsetType> cleanedPartitionSeqNo : cleanedPartitionSeqNos.entrySet()) {
if (!oldPartitionSeqNos.containsKey(cleanedPartitionSeqNo.getKey())) { if (!oldPartitionSeqNos.containsKey(cleanedPartitionSeqNo.getKey())) {
@ -2116,7 +2116,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/** /**
* Perform a sanity check on the new partition groups returned by * Perform a sanity check on the new partition groups returned by
* {@link #recomputePartitionGroupsForExpiration}. * {@link #recomputePartitionGroupsForExpiration}.
* * <p>
* Specifically, we check that the new partition groups' partitions are a subset of the original groups' partitions, * Specifically, we check that the new partition groups' partitions are a subset of the original groups' partitions,
* and that none of the offsets for the non-expired partitions have changed. * and that none of the offsets for the non-expired partitions have changed.
* *
@ -3134,6 +3134,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return partitionGroups; return partitionGroups;
} }
@VisibleForTesting
public boolean isPartitionIdsEmpty()
{
return this.partitionIds.isEmpty();
}
/** /**
* creates a specific task IOConfig instance for Kafka/Kinesis * creates a specific task IOConfig instance for Kafka/Kinesis
* *
@ -3155,7 +3161,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* the given replicas count * the given replicas count
* *
* @return list of specific kafka/kinesis index taksks * @return list of specific kafka/kinesis index taksks
*
* @throws JsonProcessingException * @throws JsonProcessingException
*/ */
protected abstract List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> createIndexTasks( protected abstract List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> createIndexTasks(
@ -3173,7 +3178,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* different between Kafka/Kinesis since Kinesis uses String as partition id * different between Kafka/Kinesis since Kinesis uses String as partition id
* *
* @param partition partition id * @param partition partition id
*
* @return taskgroup id * @return taskgroup id
*/ */
protected abstract int getTaskGroupIdForPartition(PartitionIdType partition); protected abstract int getTaskGroupIdForPartition(PartitionIdType partition);
@ -3183,7 +3187,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* of [kafka/kinesis]DataSourceMetadata * of [kafka/kinesis]DataSourceMetadata
* *
* @param metadata datasource metadata * @param metadata datasource metadata
*
* @return true if isInstance else false * @return true if isInstance else false
*/ */
protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata metadata); protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata metadata);
@ -3193,7 +3196,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* [Kafka/Kinesis]IndexTask * [Kafka/Kinesis]IndexTask
* *
* @param task task * @param task task
*
* @return true if isInstance else false * @return true if isInstance else false
*/ */
protected abstract boolean doesTaskTypeMatchSupervisor(Task task); protected abstract boolean doesTaskTypeMatchSupervisor(Task task);
@ -3203,7 +3205,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* *
* @param stream stream name * @param stream stream name
* @param map partitionId -> sequence * @param map partitionId -> sequence
*
* @return specific instance of datasource metadata * @return specific instance of datasource metadata
*/ */
protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetaDataForReset( protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetaDataForReset(