Fix to always use end sequenceNumber for reset (#8305)

* Fix to always use end sequenceNumber for reset

* fix checkstyle

* fix style and add log
This commit is contained in:
Jihoon Son 2019-08-22 14:51:25 -07:00 committed by Jonathan Wei
parent 71676cd16d
commit fba92ae469
8 changed files with 419 additions and 110 deletions

View File

@ -289,7 +289,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
@Override
protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<Integer, Long> map)
{
return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, map, Collections.emptySet()));
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
@Override
@ -392,4 +392,10 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
{
return spec.getIoConfig();
}
@VisibleForTesting
public KafkaSupervisorTuningConfig getTuningConfig()
{
return spec.getTuningConfig();
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfigs;
@ -177,7 +176,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
}
@Override
public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new KafkaIndexTaskTuningConfig(
getMaxRowsInMemory(),

View File

@ -141,7 +141,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
private final int numThreads;
private TestableKafkaSupervisor supervisor;
private KafkaSupervisorTuningConfig tuningConfig;
private TaskStorage taskStorage;
private TaskMaster taskMaster;
private TaskRunner taskRunner;
@ -205,33 +204,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClient = createMock(KafkaIndexTaskClient.class);
taskQueue = createMock(TaskQueue.class);
tuningConfig = new KafkaSupervisorTuningConfig(
1000,
null,
50000,
null,
new Period("P1Y"),
new File("/test"),
null,
null,
null,
true,
false,
null,
null,
null,
numThreads,
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
null,
null,
null,
null,
null
);
topic = getTopic();
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
serviceEmitter = new ExceptionCapturingServiceEmitter();
@ -309,6 +281,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
public void testNoInitialState() throws Exception
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@ -693,7 +666,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 10L)),
null,
null
null,
supervisor.getTuningConfig()
);
// non KafkaIndexTask (don't kill)
@ -741,6 +715,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
public void testKillBadPartitionAssignment() throws Exception
{
supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -750,7 +725,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 2, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
null,
tuningConfig
);
Task id2 = createKafkaIndexTask(
"id2",
@ -759,7 +735,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(1, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(1, Long.MAX_VALUE)),
null,
null
null,
tuningConfig
);
Task id3 = createKafkaIndexTask(
"id3",
@ -771,7 +748,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id4 = createKafkaIndexTask(
"id4",
@ -780,7 +758,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 1, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE)),
null,
null
null,
tuningConfig
);
Task id5 = createKafkaIndexTask(
"id5",
@ -789,7 +768,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(1, 0L, 2, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
null,
tuningConfig
);
List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4, id5);
@ -949,7 +929,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 2, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
now,
maxi
maxi,
supervisor.getTuningConfig()
);
List<Task> existingTasks = ImmutableList.of(id1);
@ -1141,6 +1122,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@ -1236,6 +1218,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(1);
Task task = createKafkaIndexTask(
@ -1248,7 +1231,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
supervisor.getTuningConfig()
);
Collection workItems = new ArrayList<>();
@ -1353,6 +1337,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(1);
Task task = createKafkaIndexTask(
@ -1362,7 +1347,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 2, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
null,
supervisor.getTuningConfig()
);
Collection workItems = new ArrayList<>();
@ -1463,6 +1449,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(6);
Task id1 = createKafkaIndexTask(
@ -1475,7 +1462,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id2 = createKafkaIndexTask(
@ -1488,7 +1476,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Collection workItems = new ArrayList<>();
@ -1834,6 +1823,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -1846,7 +1836,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id2 = createKafkaIndexTask(
@ -1859,7 +1850,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id3 = createKafkaIndexTask(
@ -1872,7 +1864,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Collection workItems = new ArrayList<>();
@ -2047,6 +2040,44 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
@Test
public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() throws Exception
{
addSomeEvents(2);
supervisor = getTestableSupervisor(1, 1, true, true, "PT1H", null, null);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.reset(indexerMetadataStorageCoordinator);
// unknown DataSourceMetadata in metadata store
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE))
.andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, 100L, 2, 200L))
)
).times(4);
// getOffsetFromStorageForPartition() throws an exception when the offsets are automatically reset.
// Since getOffsetFromStorageForPartition() is called per partition, all partitions can't be reset at the same time.
// Instead, subsequent partitions will be reset in the following supervisor runs.
EasyMock.expect(
indexerMetadataStorageCoordinator.resetDataSourceMetadata(
DATASOURCE,
new KafkaDataSourceMetadata(
// Only one partition is reset in a single supervisor run.
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(2, 200L))
)
)
).andReturn(true);
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
}
@Test
public void testResetRunningTasks() throws Exception
{
@ -2055,6 +2086,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -2067,7 +2099,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id2 = createKafkaIndexTask(
@ -2080,7 +2113,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id3 = createKafkaIndexTask(
@ -2093,7 +2127,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Collection workItems = new ArrayList<>();
@ -2153,6 +2188,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
supervisor.getStateManager().markRunFinished();
//not adding any events
@ -2166,7 +2202,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id2 = createKafkaIndexTask(
@ -2179,7 +2216,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id3 = createKafkaIndexTask(
@ -2192,7 +2230,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -2251,6 +2290,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
throws InterruptedException
{
supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
supervisor.getStateManager().markRunFinished();
//not adding any events
@ -2264,7 +2304,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
final Task id2 = createKafkaIndexTask(
@ -2277,7 +2318,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
final Task id3 = createKafkaIndexTask(
@ -2290,7 +2332,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
@ -2365,6 +2408,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
throws InterruptedException
{
supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
//not adding any events
final KafkaIndexTask id1 = createKafkaIndexTask(
"id1",
@ -2376,7 +2420,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
final Task id2 = createKafkaIndexTask(
@ -2389,7 +2434,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
final Task id3 = createKafkaIndexTask(
@ -2402,7 +2448,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -2455,6 +2502,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
throws InterruptedException
{
supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
supervisor.getStateManager().markRunFinished();
//not adding any events
@ -2465,7 +2513,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null
null,
tuningConfig
);
final Task id2 = createKafkaIndexTask(
@ -2475,7 +2524,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null
null,
tuningConfig
);
final Task id3 = createKafkaIndexTask(
@ -2485,7 +2535,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null
null,
tuningConfig
);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -2581,6 +2632,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true, kafkaHost);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@ -2593,7 +2645,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id2 = createKafkaIndexTask(
@ -2606,7 +2659,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Task id3 = createKafkaIndexTask(
@ -2619,7 +2673,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
tuningConfig
);
Collection workItems = new ArrayList<>();
@ -2715,6 +2770,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
false,
StringUtils.format("badhostname:%d", kafkaServer.getPort())
);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(1);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -2885,7 +2941,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null
null,
supervisor.getTuningConfig()
);
List<Task> existingTasks = ImmutableList.of(task);
@ -2954,7 +3011,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
),
new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
null,
supervisor.getTuningConfig()
);
List<Task> existingTasks = ImmutableList.of(task);
@ -3001,7 +3059,32 @@ public class KafkaSupervisorTest extends EasyMockSupport
false,
kafkaHost,
dataSchema,
tuningConfig
new KafkaSupervisorTuningConfig(
1000,
null,
50000,
null,
new Period("P1Y"),
new File("/test"),
null,
null,
null,
true,
false,
null,
false,
null,
numThreads,
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
null,
null,
null,
null,
null
)
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
@ -3056,7 +3139,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
),
minMessageTime,
maxMessageTime,
dataSchema
dataSchema,
supervisor.getTuningConfig()
);
KafkaIndexTask taskFromStorageMismatchedDataSchema = createKafkaIndexTask(
@ -3073,7 +3157,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
),
minMessageTime,
maxMessageTime,
modifiedDataSchema
modifiedDataSchema,
supervisor.getTuningConfig()
);
KafkaIndexTask taskFromStorageMismatchedTuningConfig = createKafkaIndexTask(
@ -3108,7 +3193,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
),
minMessageTime,
maxMessageTime,
dataSchema
dataSchema,
supervisor.getTuningConfig()
);
EasyMock.expect(taskStorage.getTask("id1"))
@ -3170,6 +3256,28 @@ public class KafkaSupervisorTest extends EasyMockSupport
replicas,
taskCount,
useEarliestOffset,
false,
duration,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod
);
}
private TestableKafkaSupervisor getTestableSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
boolean resetOffsetAutomatically,
String duration,
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod
)
{
return getTestableSupervisor(
replicas,
taskCount,
useEarliestOffset,
resetOffsetAutomatically,
duration,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
@ -3188,6 +3296,31 @@ public class KafkaSupervisorTest extends EasyMockSupport
boolean suspended,
String kafkaHost
)
{
return getTestableSupervisor(
replicas,
taskCount,
useEarliestOffset,
false,
duration,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
suspended,
kafkaHost
);
}
private TestableKafkaSupervisor getTestableSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
boolean resetOffsetAutomatically,
String duration,
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean suspended,
String kafkaHost
)
{
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("myCustomKey", "myCustomValue");
@ -3228,6 +3361,33 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
};
final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
1000,
null,
50000,
null,
new Period("P1Y"),
new File("/test"),
null,
null,
null,
true,
false,
null,
resetOffsetAutomatically,
null,
numThreads,
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
null,
null,
null,
null,
null
);
return new TestableKafkaSupervisor(
taskStorage,
taskMaster,
@ -3308,6 +3468,33 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
};
final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
1000,
null,
50000,
null,
new Period("P1Y"),
new File("/test"),
null,
null,
null,
true,
false,
null,
false,
null,
numThreads,
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
null,
null,
null,
null,
null
);
return new TestableKafkaSupervisorWithCustomIsTaskCurrent(
taskStorage,
taskMaster,
@ -3460,7 +3647,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions,
SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
DateTime minimumMessageTime,
DateTime maximumMessageTime
DateTime maximumMessageTime,
KafkaSupervisorTuningConfig tuningConfig
)
{
return createKafkaIndexTask(
@ -3470,7 +3658,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
endPartitions,
minimumMessageTime,
maximumMessageTime,
getDataSchema(dataSource)
getDataSchema(dataSource),
tuningConfig
);
}
@ -3481,7 +3670,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
DateTime minimumMessageTime,
DateTime maximumMessageTime,
DataSchema schema
DataSchema schema,
KafkaSupervisorTuningConfig tuningConfig
)
{
return createKafkaIndexTask(
@ -3492,7 +3682,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
minimumMessageTime,
maximumMessageTime,
schema,
(KafkaIndexTaskTuningConfig) tuningConfig.convertToTaskTuningConfig()
tuningConfig.convertToTaskTuningConfig()
);
}

View File

@ -56,7 +56,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -269,9 +268,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
Map<String, String> map
)
{
return new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(stream, map, Collections.emptySet())
);
return new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(stream, map));
}
@Override

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@ -182,7 +181,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
}
@Override
public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
public KinesisIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new KinesisIndexTaskTuningConfig(
getMaxRowsInMemory(),

View File

@ -296,9 +296,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(
null
)
new KinesisDataSourceMetadata(null)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
replayAll();
@ -2459,6 +2457,59 @@ public class KinesisSupervisorTest extends EasyMockSupport
verifyAll();
}
@Test
public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() throws Exception
{
supervisor = getTestableSupervisor(1, 1, true, true, "PT1H", null, null, false);
supervisorRecordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionIds(stream))
.andReturn(ImmutableSet.of(shardId1, shardId0))
.anyTimes();
EasyMock.expect(supervisorRecordSupplier.getAssignment())
.andReturn(ImmutableSet.of(shard1Partition, shard0Partition))
.anyTimes();
supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.reset(indexerMetadataStorageCoordinator);
// unknown DataSourceMetadata in metadata store
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE))
.andReturn(
new KinesisDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of("1", "100", "2", "200"))
)
).times(4);
// getOffsetFromStorageForPartition() throws an exception when the offsets are automatically reset.
// Since getOffsetFromStorageForPartition() is called per partition, all partitions can't be reset at the same time.
// Instead, subsequent partitions will be reset in the following supervisor runs.
EasyMock.expect(
indexerMetadataStorageCoordinator.resetDataSourceMetadata(
DATASOURCE,
new KinesisDataSourceMetadata(
// Only one partition is reset in a single supervisor run.
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of("2", "200"))
)
)
).andReturn(true);
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
}
@Test
public void testResetRunningTasks() throws Exception
{
@ -3782,6 +3833,29 @@ public class KinesisSupervisorTest extends EasyMockSupport
Period earlyMessageRejectionPeriod,
boolean suspended
)
{
return getTestableSupervisor(
replicas,
taskCount,
useEarliestOffset,
false,
duration,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
suspended
);
}
private TestableKinesisSupervisor getTestableSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
boolean resetOffsetAutomatically,
String duration,
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean suspended
)
{
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
stream,
@ -3824,6 +3898,39 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
};
final KinesisSupervisorTuningConfig tuningConfig = new KinesisSupervisorTuningConfig(
1000,
null,
50000,
null,
new Period("P1Y"),
new File("/test"),
null,
null,
null,
true,
false,
null,
resetOffsetAutomatically,
null,
null,
numThreads,
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
null,
null,
null,
5000,
null,
null,
null,
null,
null,
null
);
return new TestableKinesisSupervisor(
taskStorage,
taskMaster,

View File

@ -1328,11 +1328,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
new ResetDataSourceMetadataAction(
task.getDataSource(),
createDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(
new SeekableStreamEndSequenceNumbers<>(
ioConfig.getStartSequenceNumbers().getStream(),
partitionOffsetMap,
// Clear all exclusive start offsets for automatic reset
Collections.emptySet()
partitionOffsetMap
)
)
)

View File

@ -1149,16 +1149,17 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
activelyReadingTaskGroups.clear();
partitionGroups.clear();
} else {
if (!checkSourceMetadataMatch(dataSourceMetadata)) {
throw new IAE(
"Datasource metadata instance does not match required, found instance of [%s]",
dataSourceMetadata.getClass()
);
}
log.info("Reset dataSource[%s] with metadata[%s]", dataSource, dataSourceMetadata);
// Reset only the partitions in dataSourceMetadata if it has not been reset yet
@SuppressWarnings("unchecked")
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> resetMetadata = (SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) dataSourceMetadata;
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> resetMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) dataSourceMetadata;
if (resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream())) {
// metadata can be null
@ -1171,19 +1172,22 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@SuppressWarnings("unchecked")
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata = (SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) metadata;
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) metadata;
// defend against consecutive reset requests from replicas
// as well as the case where the metadata store do not have an entry for the reset partitions
boolean doReset = false;
for (Entry<PartitionIdType, SequenceOffsetType> resetPartitionOffset : resetMetadata.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.entrySet()) {
for (Entry<PartitionIdType, SequenceOffsetType> resetPartitionOffset : resetMetadata
.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.entrySet()) {
final SequenceOffsetType partitionOffsetInMetadataStore = currentMetadata == null
? null
: currentMetadata.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.get(resetPartitionOffset.getKey());
: currentMetadata
.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.get(resetPartitionOffset.getKey());
final TaskGroup partitionTaskGroup = activelyReadingTaskGroups.get(
getTaskGroupIdForPartition(resetPartitionOffset.getKey())
);
@ -1225,7 +1229,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
"DataSourceMetadata is updated while reset"
);
activelyReadingTaskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partitionId, sequence) -> getNotSetMarker());
// killTaskGroupForPartitions() cleans up partitionGroups.
// Add the removed groups back.
partitionGroups.computeIfAbsent(groupId, k -> new ConcurrentHashMap<>())
.put(partition, getNotSetMarker());
});
} else {
throw new ISE("Unable to reset metadata");
@ -2472,7 +2479,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return notices.size();
}
private ImmutableMap<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> generateStartingSequencesForPartitionGroup(
private Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> generateStartingSequencesForPartitionGroup(
int groupId
)
{
@ -2500,8 +2507,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
/**
* Queries the dataSource metadata table to see if there is a previous ending sequence for this partition. If it doesn't
* find any data, it will retrieve the latest or earliest Kafka/Kinesis sequence depending on the useEarliestOffset config.
* Queries the dataSource metadata table to see if there is a previous ending sequence for this partition. If it
* doesn't find any data, it will retrieve the latest or earliest Kafka/Kinesis sequence depending on the
* {@link SeekableStreamSupervisorIOConfig#useEarliestSequenceNumber}.
*/
private OrderedSequenceNumber<SequenceOffsetType> getOffsetFromStorageForPartition(PartitionIdType partition)
{
@ -2515,18 +2523,23 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
resetInternal(
createDataSourceMetaDataForReset(ioConfig.getStream(), ImmutableMap.of(partition, sequence))
);
throw new StreamException(new ISE(
"Previous sequenceNumber [%s] is no longer available for partition [%s] - automatically resetting sequence",
sequence,
partition
));
throw new StreamException(
new ISE(
"Previous sequenceNumber [%s] is no longer available for partition [%s] - automatically resetting"
+ " sequence",
sequence,
partition
)
);
} else {
throw new StreamException(new ISE(
"Previous sequenceNumber [%s] is no longer available for partition [%s]. You can clear the previous sequenceNumber and start reading from a valid message by using the supervisor's reset API.",
sequence,
partition
));
throw new StreamException(
new ISE(
"Previous sequenceNumber [%s] is no longer available for partition [%s]. You can clear the previous"
+ " sequenceNumber and start reading from a valid message by using the supervisor's reset API.",
sequence,
partition
)
);
}
}
}