add supervisor reset API (#3484)

* add supervisor reset API

* CR doc changes and kill running tasks / clear offsets from supervisor
This commit is contained in:
David Lim 2016-09-22 18:51:06 -06:00 committed by Gian Merlino
parent 6099d20303
commit ca9114b41b
13 changed files with 304 additions and 3 deletions

View File

@ -222,6 +222,24 @@ GET /druid/indexer/v1/supervisor/<supervisorId>/history
```
Returns an audit history of specs for the supervisor with the provided ID.
#### Reset Supervisor
```
POST /druid/indexer/v1/supervisor/<supervisorId>/reset
```
The indexing service keeps track of the latest persisted Kafka offsets in order to provide exactly-once ingestion
guarantees across tasks. Subsequent tasks must start reading from where the previous task completed in order for the
generated segments to be accepted. If the messages at the expected starting offsets are no longer available in Kafka
(typically because the message retention period has elapsed or the topic was removed and re-created) the supervisor will
refuse to start and in-flight tasks will fail.
This endpoint can be used to clear the stored offsets which will cause the supervisor to start reading from
either the earliest or latest offsets in Kafka (depending on the value of `useEarliestOffset`). The supervisor must be
running for this endpoint to be available. After the stored offsets are cleared, the supervisor will automatically kill
and re-create any active tasks so that tasks begin reading from valid offsets.
Note that since the stored offsets are necessary to guarantee exactly-once ingestion, resetting them with this endpoint
may cause some Kafka messages to be skipped or to be read twice.
## Capacity Planning
Kafka indexing tasks run on middle managers and are thus limited by the resources available in the middle manager

View File

@ -398,6 +398,12 @@ public class KafkaSupervisor implements Supervisor
return generateReport(true);
}
@Override
public void reset() {
log.info("Posting ResetNotice");
notices.add(new ResetNotice());
}
public void possiblyRegisterListener()
{
// getTaskRunner() sometimes fails if the task queue is still being initialized so retry later until we succeed
@ -479,6 +485,33 @@ public class KafkaSupervisor implements Supervisor
}
}
private class ResetNotice implements Notice
{
@Override
public void handle()
{
resetInternal();
}
}
@VisibleForTesting
void resetInternal()
{
boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result);
for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId);
killTask(taskId);
}
}
partitionGroups.clear();
taskGroups.clear();
}
@VisibleForTesting
void gracefulShutdownInternal() throws ExecutionException, InterruptedException
{
@ -1289,8 +1322,9 @@ public class KafkaSupervisor implements Supervisor
long latestKafkaOffset = getOffsetFromKafkaForPartition(partition, false);
if (offset > latestKafkaOffset) {
throw new ISE(
"Offset in metadata storage [%,d] > latest Kafka offset [%,d] for partition [%d]. If your Kafka offsets have"
+ " been reset, you will need to remove the entry for [%s] from the dataSource table.",
"Offset in metadata storage [%,d] > latest Kafka offset [%,d] for partition[%d] dataSource[%s]. If these "
+ "messages are no longer available (perhaps you deleted and re-created your Kafka topic) you can use the "
+ "supervisor reset API to restart ingestion.",
offset,
latestKafkaOffset,
partition,

View File

@ -1459,6 +1459,110 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
@Test
public void testResetNoTasks() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
reset(indexerMetadataStorageCoordinator);
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
replay(indexerMetadataStorageCoordinator);
supervisor.resetInternal();
verifyAll();
}
@Test
public void testResetRunningTasks() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234);
final TaskLocation location2 = new TaskLocation("testHost2", 145);
final DateTime startTime = new DateTime();
supervisor = getSupervisor(2, 1, true, "PT1H", null);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1.getId(), null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2.getId(), null, location2));
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING));
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
reset(taskQueue, indexerMetadataStorageCoordinator);
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
taskQueue.shutdown("id2");
taskQueue.shutdown("id3");
replay(taskQueue, indexerMetadataStorageCoordinator);
supervisor.resetInternal();
verifyAll();
}
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {

View File

@ -138,6 +138,21 @@ public class SupervisorManager
return supervisor == null ? Optional.<SupervisorReport>absent() : Optional.fromNullable(supervisor.lhs.getStatus());
}
public boolean resetSupervisor(String id)
{
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(id, "id");
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
if (supervisor == null) {
return false;
}
supervisor.lhs.reset();
return true;
}
/**
* Stops a supervisor with a given id and then removes it from the list.
* <p>

View File

@ -206,6 +206,29 @@ public class SupervisorResource
);
}
@POST
@Path("/{id}/reset")
@Produces(MediaType.APPLICATION_JSON)
public Response reset(@PathParam("id") final String id)
{
return asLeaderWithSupervisorManager(
new Function<SupervisorManager, Response>()
{
@Override
public Response apply(SupervisorManager manager)
{
if (manager.resetSupervisor(id)) {
return Response.ok(ImmutableMap.of("id", id)).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", String.format("[%s] does not exist", id)))
.build();
}
}
}
);
}
private Response asLeaderWithSupervisorManager(Function<SupervisorManager, Response> f)
{
Optional<SupervisorManager> supervisorManager = taskMaster.getSupervisorManager();

View File

@ -249,6 +249,25 @@ public class SupervisorManagerTest extends EasyMockSupport
verifyAll();
}
@Test
public void testResetSupervisor() throws Exception
{
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.<String, SupervisorSpec>of(
"id1", new TestSupervisorSpec("id1", supervisor1)
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.reset();
replayAll();
manager.start();
Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1"));
Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home"));
verifyAll();
}
private class TestSupervisorSpec implements SupervisorSpec
{
private final String id;

View File

@ -267,6 +267,35 @@ public class SupervisorResourceTest extends EasyMockSupport
Assert.assertEquals(503, response.getStatus());
}
@Test
public void testReset() throws Exception
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(supervisorManager.resetSupervisor("my-id")).andReturn(true);
EasyMock.expect(supervisorManager.resetSupervisor("my-id-2")).andReturn(false);
replayAll();
Response response = supervisorResource.reset("my-id");
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
response = supervisorResource.reset("my-id-2");
Assert.assertEquals(404, response.getStatus());
verifyAll();
resetAll();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.<SupervisorManager>absent());
replayAll();
response = supervisorResource.shutdown("my-id");
Assert.assertEquals(503, response.getStatus());
verifyAll();
}
private class TestSupervisorSpec implements SupervisorSpec
{
private final String id;

View File

@ -51,6 +51,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
throw new UnsupportedOperationException();
}
@Override
public boolean deleteDataSourceMetadata(String dataSource)
{
throw new UnsupportedOperationException();
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException
{

View File

@ -54,7 +54,7 @@ public interface IndexerMetadataStorageCoordinator
*
* @throws IOException
*/
public List<DataSegment> getUsedSegmentsForIntervals(final String dataSource, final List<Interval> intervals)
List<DataSegment> getUsedSegmentsForIntervals(final String dataSource, final List<Interval> intervals)
throws IOException;
/**
@ -120,6 +120,14 @@ public interface IndexerMetadataStorageCoordinator
DataSourceMetadata getDataSourceMetadata(String dataSource);
/**
* Removes entry for 'dataSource' from the dataSource metadata table.
*
* @param dataSource identifier
* @return true if the entry was deleted, false otherwise
*/
boolean deleteDataSourceMetadata(String dataSource);
void updateSegmentMetadata(Set<DataSegment> segments) throws IOException;
void deleteSegments(Set<DataSegment> segments) throws IOException;

View File

@ -46,6 +46,9 @@ public class NoopSupervisorSpec implements SupervisorSpec
{
return null;
}
@Override
public void reset() {}
};
}
}

View File

@ -32,4 +32,6 @@ public interface Supervisor
void stop(boolean stopGracefully);
SupervisorReport getStatus();
void reset();
}

View File

@ -772,6 +772,26 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return retVal;
}
public boolean deleteDataSourceMetadata(final String dataSource)
{
return connector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
int rows = handle.createStatement(
String.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable())
)
.bind("dataSource", dataSource)
.execute();
return rows > 0;
}
}
);
}
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{
connector.getDBI().inTransaction(

View File

@ -523,4 +523,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
)
);
}
@Test
public void testDeleteDataSourceMetadata() throws IOException
{
coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
Assert.assertEquals(
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
coordinator.getDataSourceMetadata("fooDataSource")
);
Assert.assertFalse("deleteInvalidDataSourceMetadata", coordinator.deleteDataSourceMetadata("nonExistentDS"));
Assert.assertTrue("deleteValidDataSourceMetadata", coordinator.deleteDataSourceMetadata("fooDataSource"));
Assert.assertNull("getDataSourceMetadataNullAfterDelete", coordinator.getDataSourceMetadata("fooDataSource"));
}
}