retry calls to getStartTime (#3429)

This commit is contained in:
David Lim 2016-09-06 15:02:22 -06:00 committed by Gian Merlino
parent 3a97fd4d6c
commit 5b1ae21bd1
4 changed files with 19 additions and 15 deletions

View File

@ -150,10 +150,10 @@ public class KafkaIndexTaskClient
}
}
public DateTime getStartTime(String id, boolean retry)
public DateTime getStartTime(String id)
{
try {
final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "time/start", null, retry);
final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "time/start", null, true);
return response.getContent() == null || response.getContent().isEmpty()
? null
: jsonMapper.readValue(response.getContent(), DateTime.class);

View File

@ -1215,7 +1215,7 @@ public class KafkaSupervisor implements Supervisor
private DateTime getTaskStartTime(final String id)
{
if (!taskInfoProvider.getTaskLocation(id).equals(TaskLocation.unknown())) {
DateTime startTime = taskClient.getStartTime(id, false);
DateTime startTime = taskClient.getStartTime(id);
log.debug("Received start time of [%s] from task [%s]", startTime, id);
return startTime;
}

View File

@ -271,17 +271,22 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
@Test
public void testGetStartTime() throws Exception
{
client = new RetryingTestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider);
DateTime now = DateTime.now();
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3)
.andReturn(HttpResponseStatus.OK);
expect(responseHolder.getResponse()).andReturn(response);
expect(response.headers()).andReturn(headers);
expect(headers.get("X-Druid-Task-Id")).andReturn(null);
expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
Futures.immediateFuture(responseHolder)
);
).times(2);
replayAll();
DateTime results = client.getStartTime(TEST_ID, false);
DateTime results = client.getStartTime(TEST_ID);
verifyAll();
Request request = captured.getValue();

View File

@ -95,7 +95,6 @@ import java.util.concurrent.Executor;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
@ -487,7 +486,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes();
expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id"))
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
.anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@ -570,7 +569,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getTask("id4")).andReturn(Optional.of(id3)).anyTimes();
expect(taskStorage.getTask("id5")).andReturn(Optional.of(id3)).anyTimes();
expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes();
expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id"))
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
.anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@ -600,7 +599,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes();
expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id"))
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
.anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@ -682,7 +681,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id"))
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
.anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true);
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
@ -741,7 +740,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes();
expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id"))
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
.anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@ -839,10 +838,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskClient.getStatus(anyString()))
.andReturn(KafkaIndexTask.Status.READING)
.anyTimes();
expect(taskClient.getStartTime(EasyMock.contains("sequenceName-0"), eq(false)))
expect(taskClient.getStartTime(EasyMock.contains("sequenceName-0")))
.andReturn(DateTime.now().minusMinutes(2))
.andReturn(DateTime.now());
expect(taskClient.getStartTime(EasyMock.contains("sequenceName-1"), eq(false)))
expect(taskClient.getStartTime(EasyMock.contains("sequenceName-1")))
.andReturn(DateTime.now())
.times(2);
expect(taskClient.pause(EasyMock.contains("sequenceName-0")))
@ -1101,7 +1100,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
).anyTimes();
expect(taskClient.getStatus("id1")).andReturn(KafkaIndexTask.Status.PUBLISHING);
expect(taskClient.getStatus("id2")).andReturn(KafkaIndexTask.Status.READING);
expect(taskClient.getStartTime("id2", false)).andReturn(startTime);
expect(taskClient.getStartTime("id2")).andReturn(startTime);
expect(taskClient.getCurrentOffsets("id1", false)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCurrentOffsets("id2", false)).andReturn(ImmutableMap.of(0, 40L, 1, 50L, 2, 60L));