From 5b1ae21bd1de819c4fceb3daa483d900e688651c Mon Sep 17 00:00:00 2001 From: David Lim Date: Tue, 6 Sep 2016 15:02:22 -0600 Subject: [PATCH] retry calls to getStartTime (#3429) --- .../indexing/kafka/KafkaIndexTaskClient.java | 4 ++-- .../kafka/supervisor/KafkaSupervisor.java | 2 +- .../kafka/KafkaIndexTaskClientTest.java | 11 ++++++++--- .../kafka/supervisor/KafkaSupervisorTest.java | 17 ++++++++--------- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java index 19d58414cff..e2bf2b99b42 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java @@ -150,10 +150,10 @@ public class KafkaIndexTaskClient } } - public DateTime getStartTime(String id, boolean retry) + public DateTime getStartTime(String id) { try { - final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "time/start", null, retry); + final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "time/start", null, true); return response.getContent() == null || response.getContent().isEmpty() ? null : jsonMapper.readValue(response.getContent(), DateTime.class); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 3dccbc79037..0f4791c708d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1215,7 +1215,7 @@ public class KafkaSupervisor implements Supervisor private DateTime getTaskStartTime(final String id) { if (!taskInfoProvider.getTaskLocation(id).equals(TaskLocation.unknown())) { - DateTime startTime = taskClient.getStartTime(id, false); + DateTime startTime = taskClient.getStartTime(id); log.debug("Received start time of [%s] from task [%s]", startTime, id); return startTime; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java index ec02e57dc46..1cc9b93a609 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -271,17 +271,22 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport @Test public void testGetStartTime() throws Exception { + client = new RetryingTestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider); DateTime now = DateTime.now(); Capture captured = Capture.newInstance(); - expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK); + expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3) + .andReturn(HttpResponseStatus.OK); + expect(responseHolder.getResponse()).andReturn(response); + expect(response.headers()).andReturn(headers); + expect(headers.get("X-Druid-Task-Id")).andReturn(null); expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes(); expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn( Futures.immediateFuture(responseHolder) - ); + ).times(2); replayAll(); - DateTime results = client.getStartTime(TEST_ID, false); + DateTime results = client.getStartTime(TEST_ID); verifyAll(); Request request = captured.getValue(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index db034449aee..abe0332bac9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -95,7 +95,6 @@ import java.util.concurrent.Executor; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; @@ -487,7 +486,7 @@ public class KafkaSupervisorTest extends EasyMockSupport expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes(); - expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id")) + expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")) .anyTimes(); expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( @@ -570,7 +569,7 @@ public class KafkaSupervisorTest extends EasyMockSupport expect(taskStorage.getTask("id4")).andReturn(Optional.of(id3)).anyTimes(); expect(taskStorage.getTask("id5")).andReturn(Optional.of(id3)).anyTimes(); expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes(); - expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id")) + expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")) .anyTimes(); expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( @@ -600,7 +599,7 @@ public class KafkaSupervisorTest extends EasyMockSupport expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes(); - expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id")) + expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")) .anyTimes(); expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( @@ -682,7 +681,7 @@ public class KafkaSupervisorTest extends EasyMockSupport expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); - expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id")) + expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")) .anyTimes(); expect(taskQueue.add(capture(captured))).andReturn(true); expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( @@ -741,7 +740,7 @@ public class KafkaSupervisorTest extends EasyMockSupport expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes(); - expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id")) + expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")) .anyTimes(); expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( @@ -839,10 +838,10 @@ public class KafkaSupervisorTest extends EasyMockSupport expect(taskClient.getStatus(anyString())) .andReturn(KafkaIndexTask.Status.READING) .anyTimes(); - expect(taskClient.getStartTime(EasyMock.contains("sequenceName-0"), eq(false))) + expect(taskClient.getStartTime(EasyMock.contains("sequenceName-0"))) .andReturn(DateTime.now().minusMinutes(2)) .andReturn(DateTime.now()); - expect(taskClient.getStartTime(EasyMock.contains("sequenceName-1"), eq(false))) + expect(taskClient.getStartTime(EasyMock.contains("sequenceName-1"))) .andReturn(DateTime.now()) .times(2); expect(taskClient.pause(EasyMock.contains("sequenceName-0"))) @@ -1101,7 +1100,7 @@ public class KafkaSupervisorTest extends EasyMockSupport ).anyTimes(); expect(taskClient.getStatus("id1")).andReturn(KafkaIndexTask.Status.PUBLISHING); expect(taskClient.getStatus("id2")).andReturn(KafkaIndexTask.Status.READING); - expect(taskClient.getStartTime("id2", false)).andReturn(startTime); + expect(taskClient.getStartTime("id2")).andReturn(startTime); expect(taskClient.getCurrentOffsets("id1", false)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); expect(taskClient.getCurrentOffsets("id2", false)).andReturn(ImmutableMap.of(0, 40L, 1, 50L, 2, 60L));