diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index e0289f388b8..a06ed7f8f3b 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -222,6 +222,24 @@ GET /druid/indexer/v1/supervisor//history ``` Returns an audit history of specs for the supervisor with the provided ID. +#### Reset Supervisor +``` +POST /druid/indexer/v1/supervisor//reset +``` +The indexing service keeps track of the latest persisted Kafka offsets in order to provide exactly-once ingestion +guarantees across tasks. Subsequent tasks must start reading from where the previous task completed in order for the +generated segments to be accepted. If the messages at the expected starting offsets are no longer available in Kafka +(typically because the message retention period has elapsed or the topic was removed and re-created) the supervisor will +refuse to start and in-flight tasks will fail. + +This endpoint can be used to clear the stored offsets which will cause the supervisor to start reading from +either the earliest or latest offsets in Kafka (depending on the value of `useEarliestOffset`). The supervisor must be +running for this endpoint to be available. After the stored offsets are cleared, the supervisor will automatically kill +and re-create any active tasks so that tasks begin reading from valid offsets. + +Note that since the stored offsets are necessary to guarantee exactly-once ingestion, resetting them with this endpoint +may cause some Kafka messages to be skipped or to be read twice. + ## Capacity Planning Kafka indexing tasks run on middle managers and are thus limited by the resources available in the middle manager diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index b61b6834984..8009fc93294 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -398,6 +398,12 @@ public class KafkaSupervisor implements Supervisor return generateReport(true); } + @Override + public void reset() { + log.info("Posting ResetNotice"); + notices.add(new ResetNotice()); + } + public void possiblyRegisterListener() { // getTaskRunner() sometimes fails if the task queue is still being initialized so retry later until we succeed @@ -479,6 +485,33 @@ public class KafkaSupervisor implements Supervisor } } + private class ResetNotice implements Notice + { + @Override + public void handle() + { + resetInternal(); + } + } + + @VisibleForTesting + void resetInternal() + { + boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource); + log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result); + + for (TaskGroup taskGroup : taskGroups.values()) { + for (Map.Entry entry : taskGroup.tasks.entrySet()) { + String taskId = entry.getKey(); + log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId); + killTask(taskId); + } + } + + partitionGroups.clear(); + taskGroups.clear(); + } + @VisibleForTesting void gracefulShutdownInternal() throws ExecutionException, InterruptedException { @@ -1289,8 +1322,9 @@ public class KafkaSupervisor implements Supervisor long latestKafkaOffset = getOffsetFromKafkaForPartition(partition, false); if (offset > latestKafkaOffset) { throw new ISE( - "Offset in metadata storage [%,d] > latest Kafka offset [%,d] for partition [%d]. If your Kafka offsets have" - + " been reset, you will need to remove the entry for [%s] from the dataSource table.", + "Offset in metadata storage [%,d] > latest Kafka offset [%,d] for partition[%d] dataSource[%s]. If these " + + "messages are no longer available (perhaps you deleted and re-created your Kafka topic) you can use the " + + "supervisor reset API to restart ingestion.", offset, latestKafkaOffset, partition, diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 93850ac2de8..ec0565ac701 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1459,6 +1459,110 @@ public class KafkaSupervisorTest extends EasyMockSupport verifyAll(); } + @Test + public void testResetNoTasks() throws Exception + { + supervisor = getSupervisor(1, 1, true, "PT1H", null); + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + reset(indexerMetadataStorageCoordinator); + expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); + replay(indexerMetadataStorageCoordinator); + + supervisor.resetInternal(); + verifyAll(); + + } + + @Test + public void testResetRunningTasks() throws Exception + { + final TaskLocation location1 = new TaskLocation("testHost", 1234); + final TaskLocation location2 = new TaskLocation("testHost2", 145); + final DateTime startTime = new DateTime(); + + supervisor = getSupervisor(2, 1, true, "PT1H", null); + addSomeEvents(1); + + Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + "sequenceName-0", + new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null + ); + + Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + "sequenceName-0", + new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null + ); + + Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + "sequenceName-0", + new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null + ); + + Collection workItems = new ArrayList<>(); + workItems.add(new TestTaskRunnerWorkItem(id1.getId(), null, location1)); + workItems.add(new TestTaskRunnerWorkItem(id2.getId(), null, location2)); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING)); + expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + reset(taskQueue, indexerMetadataStorageCoordinator); + expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); + taskQueue.shutdown("id2"); + taskQueue.shutdown("id3"); + replay(taskQueue, indexerMetadataStorageCoordinator); + + supervisor.resetInternal(); + verifyAll(); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 7a9931cc742..5cf69ea40d6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -138,6 +138,21 @@ public class SupervisorManager return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getStatus()); } + public boolean resetSupervisor(String id) + { + Preconditions.checkState(started, "SupervisorManager not started"); + Preconditions.checkNotNull(id, "id"); + + Pair supervisor = supervisors.get(id); + + if (supervisor == null) { + return false; + } + + supervisor.lhs.reset(); + return true; + } + /** * Stops a supervisor with a given id and then removes it from the list. *

diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java index c96387640dc..018afa99240 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -206,6 +206,29 @@ public class SupervisorResource ); } + @POST + @Path("/{id}/reset") + @Produces(MediaType.APPLICATION_JSON) + public Response reset(@PathParam("id") final String id) + { + return asLeaderWithSupervisorManager( + new Function() + { + @Override + public Response apply(SupervisorManager manager) + { + if (manager.resetSupervisor(id)) { + return Response.ok(ImmutableMap.of("id", id)).build(); + } else { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", String.format("[%s] does not exist", id))) + .build(); + } + } + } + ); + } + private Response asLeaderWithSupervisorManager(Function f) { Optional supervisorManager = taskMaster.getSupervisorManager(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 9fef185a751..0861bfb77f8 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -249,6 +249,25 @@ public class SupervisorManagerTest extends EasyMockSupport verifyAll(); } + @Test + public void testResetSupervisor() throws Exception + { + Map existingSpecs = ImmutableMap.of( + "id1", new TestSupervisorSpec("id1", supervisor1) + ); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + supervisor1.start(); + supervisor1.reset(); + replayAll(); + + manager.start(); + Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1")); + Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home")); + + verifyAll(); + } + private class TestSupervisorSpec implements SupervisorSpec { private final String id; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index f63f1fdeb59..0eff330a801 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -267,6 +267,35 @@ public class SupervisorResourceTest extends EasyMockSupport Assert.assertEquals(503, response.getStatus()); } + @Test + public void testReset() throws Exception + { + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2); + EasyMock.expect(supervisorManager.resetSupervisor("my-id")).andReturn(true); + EasyMock.expect(supervisorManager.resetSupervisor("my-id-2")).andReturn(false); + replayAll(); + + Response response = supervisorResource.reset("my-id"); + + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity()); + + response = supervisorResource.reset("my-id-2"); + + Assert.assertEquals(404, response.getStatus()); + verifyAll(); + + resetAll(); + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); + replayAll(); + + response = supervisorResource.shutdown("my-id"); + + Assert.assertEquals(503, response.getStatus()); + verifyAll(); + } + private class TestSupervisorSpec implements SupervisorSpec { private final String id; diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 3c2bf2917ee..b6c13997f45 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -51,6 +51,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto throw new UnsupportedOperationException(); } + @Override + public boolean deleteDataSourceMetadata(String dataSource) + { + throw new UnsupportedOperationException(); + } + @Override public List getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException { diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index d20e5cdc01a..808bd35506d 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -54,7 +54,7 @@ public interface IndexerMetadataStorageCoordinator * * @throws IOException */ - public List getUsedSegmentsForIntervals(final String dataSource, final List intervals) + List getUsedSegmentsForIntervals(final String dataSource, final List intervals) throws IOException; /** @@ -120,6 +120,14 @@ public interface IndexerMetadataStorageCoordinator DataSourceMetadata getDataSourceMetadata(String dataSource); + /** + * Removes entry for 'dataSource' from the dataSource metadata table. + * + * @param dataSource identifier + * @return true if the entry was deleted, false otherwise + */ + boolean deleteDataSourceMetadata(String dataSource); + void updateSegmentMetadata(Set segments) throws IOException; void deleteSegments(Set segments) throws IOException; diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index f3c7937c3df..837c266655c 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -46,6 +46,9 @@ public class NoopSupervisorSpec implements SupervisorSpec { return null; } + + @Override + public void reset() {} }; } } diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index c32804b343e..6f64d486a4d 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -32,4 +32,6 @@ public interface Supervisor void stop(boolean stopGracefully); SupervisorReport getStatus(); + + void reset(); } diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 3ce09c1a5a9..6f4de4b2a33 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -772,6 +772,26 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return retVal; } + public boolean deleteDataSourceMetadata(final String dataSource) + { + return connector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + int rows = handle.createStatement( + String.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) + ) + .bind("dataSource", dataSource) + .execute(); + + return rows > 0; + } + } + ); + } + public void updateSegmentMetadata(final Set segments) throws IOException { connector.getDBI().inTransaction( diff --git a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 3af817ed8d7..2b53428902e 100644 --- a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -523,4 +523,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest ) ); } + + @Test + public void testDeleteDataSourceMetadata() throws IOException + { + coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.getDataSourceMetadata("fooDataSource") + ); + + Assert.assertFalse("deleteInvalidDataSourceMetadata", coordinator.deleteDataSourceMetadata("nonExistentDS")); + Assert.assertTrue("deleteValidDataSourceMetadata", coordinator.deleteDataSourceMetadata("fooDataSource")); + + Assert.assertNull("getDataSourceMetadataNullAfterDelete", coordinator.getDataSourceMetadata("fooDataSource")); + } }