diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 33866d5887c..21dcaa15aac 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -172,7 +172,12 @@ public class OverlordResource } catch (EntryExistsException e) { return Response.status(Response.Status.BAD_REQUEST) - .entity(ImmutableMap.of("error", StringUtils.format("Task[%s] already exists!", task.getId()))) + .entity( + ImmutableMap.of( + "error", + StringUtils.format("Task[%s] already exists!", task.getId()) + ) + ) .build(); } } @@ -212,7 +217,16 @@ public class OverlordResource @ResourceFilters(TaskResourceFilter.class) public Response getTaskPayload(@PathParam("taskid") String taskid) { - return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid)); + final TaskPayloadResponse response = new TaskPayloadResponse( + taskid, + taskStorageQueryAdapter.getTask(taskid).orNull() + ); + + final Response.Status status = response.getPayload() == null + ? Response.Status.NOT_FOUND + : Response.Status.OK; + + return Response.status(status).entity(response).build(); } @GET @@ -221,7 +235,16 @@ public class OverlordResource @ResourceFilters(TaskResourceFilter.class) public Response getTaskStatus(@PathParam("taskid") String taskid) { - return optionalTaskResponse(taskid, "status", taskStorageQueryAdapter.getStatus(taskid)); + final TaskStatusResponse response = new TaskStatusResponse( + taskid, + taskStorageQueryAdapter.getStatus(taskid).orNull() + ); + + final Response.Status status = response.getStatus() == null + ? Response.Status.NOT_FOUND + : Response.Status.OK; + + return Response.status(status).entity(response).build(); } @GET @@ -575,14 +598,15 @@ public class OverlordResource final List completeTasks = recentlyFinishedTasks .stream() .map(status -> new TaskStatusPlus( - status.getId(), - taskFunction.apply(status.getId()).getType(), - taskStorageQueryAdapter.getCreatedTime(status.getId()), - // Would be nice to include the real queue insertion time, but the TaskStorage API doesn't yet allow it. - DateTimes.EPOCH, - status.getStatusCode(), - status.getDuration(), - TaskLocation.unknown()) + status.getId(), + taskFunction.apply(status.getId()).getType(), + taskStorageQueryAdapter.getCreatedTime(status.getId()), + // Would be nice to include the real queue insertion time, but the TaskStorage API doesn't yet allow it. + DateTimes.EPOCH, + status.getStatusCode(), + status.getDuration(), + TaskLocation.unknown() + ) ) .collect(Collectors.toList()); @@ -729,18 +753,6 @@ public class OverlordResource ); } - private Response optionalTaskResponse(String taskid, String objectType, Optional x) - { - final Map results = Maps.newHashMap(); - results.put("task", taskid); - if (x.isPresent()) { - results.put(objectType, x.get()); - return Response.status(Response.Status.OK).entity(results).build(); - } else { - return Response.status(Response.Status.NOT_FOUND).entity(results).build(); - } - } - private Response asLeaderWith(Optional x, Function f) { if (x.isPresent()) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java new file mode 100644 index 00000000000..1e7b4e9a544 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskPayloadResponse.java @@ -0,0 +1,83 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.indexing.common.task.Task; + +import java.util.Objects; + +public class TaskPayloadResponse +{ + private final String task; // Task ID, named "task" in the JSONification of this class. + private final Task payload; + + @JsonCreator + public TaskPayloadResponse( + @JsonProperty("task") final String task, + @JsonProperty("payload") final Task payload + ) + { + this.task = task; + this.payload = payload; + } + + @JsonProperty + public String getTask() + { + return task; + } + + @JsonProperty + public Task getPayload() + { + return payload; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TaskPayloadResponse that = (TaskPayloadResponse) o; + return Objects.equals(task, that.task) && + Objects.equals(payload, that.payload); + } + + @Override + public int hashCode() + { + return Objects.hash(task, payload); + } + + @Override + public String toString() + { + return "TaskPayloadResponse{" + + "task='" + task + '\'' + + ", payload=" + payload + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java new file mode 100644 index 00000000000..b15aeb0cc0a --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/TaskStatusResponse.java @@ -0,0 +1,84 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.indexing.common.TaskStatus; + +import java.util.Objects; + +public class TaskStatusResponse +{ + private final String task; // Task ID, named "task" in the JSONification of this class. + private final TaskStatus status; + + @JsonCreator + public TaskStatusResponse( + @JsonProperty("task") final String task, + @JsonProperty("status") final TaskStatus status + ) + { + this.task = task; + this.status = status; + } + + @JsonProperty + public String getTask() + { + return task; + } + + @JsonProperty + public TaskStatus getStatus() + { + return status; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TaskStatusResponse that = (TaskStatusResponse) o; + return Objects.equals(task, that.task) && + Objects.equals(status, that.status); + } + + @Override + public int hashCode() + { + + return Objects.hash(task, status); + } + + @Override + public String toString() + { + return "TaskstatusResponse{" + + "task='" + task + '\'' + + ", status=" + status + + '}'; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 817d6743df9..3fc45c2f6b9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -39,6 +39,7 @@ import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.java.util.common.DateTimes; +import io.druid.segment.TestHelper; import io.druid.server.security.Access; import io.druid.server.security.Action; import io.druid.server.security.AuthConfig; @@ -87,7 +88,8 @@ public class OverlordResourceTest Optional.of(taskRunner) ).anyTimes(); - AuthorizerMapper authMapper = new AuthorizerMapper(null) { + AuthorizerMapper authMapper = new AuthorizerMapper(null) + { @Override public Authorizer getAuthorizer(String name) { @@ -289,7 +291,12 @@ public class OverlordResourceTest EasyMock.expect(taskMaster.isLeader()).andReturn(true); EasyMock - .expect(indexerMetadataStorageAdapter.deletePendingSegments(EasyMock.eq("allow"), EasyMock.anyObject(Interval.class))) + .expect( + indexerMetadataStorageAdapter.deletePendingSegments( + EasyMock.eq("allow"), + EasyMock.anyObject(Interval.class) + ) + ) .andReturn(2); EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); @@ -300,6 +307,61 @@ public class OverlordResourceTest Assert.assertEquals(2, response.get("numDeleted").intValue()); } + @Test + public void testGetTaskPayload() throws Exception + { + expectAuthorizationTokenCheck(); + final NoopTask task = NoopTask.create("mydatasource"); + EasyMock.expect(taskStorageQueryAdapter.getTask("mytask")) + .andReturn(Optional.of(task)); + + EasyMock.expect(taskStorageQueryAdapter.getTask("othertask")) + .andReturn(Optional.absent()); + + EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); + + final Response response1 = overlordResource.getTaskPayload("mytask"); + final TaskPayloadResponse taskPayloadResponse1 = TestHelper.makeJsonMapper().readValue( + TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()), + TaskPayloadResponse.class + ); + Assert.assertEquals(new TaskPayloadResponse("mytask", task), taskPayloadResponse1); + + final Response response2 = overlordResource.getTaskPayload("othertask"); + final TaskPayloadResponse taskPayloadResponse2 = TestHelper.makeJsonMapper().readValue( + TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()), + TaskPayloadResponse.class + ); + Assert.assertEquals(new TaskPayloadResponse("othertask", null), taskPayloadResponse2); + } + + @Test + public void testGetTaskStatus() throws Exception + { + expectAuthorizationTokenCheck(); + EasyMock.expect(taskStorageQueryAdapter.getStatus("mytask")) + .andReturn(Optional.of(TaskStatus.success("mytask"))); + + EasyMock.expect(taskStorageQueryAdapter.getStatus("othertask")) + .andReturn(Optional.absent()); + + EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); + + final Response response1 = overlordResource.getTaskStatus("mytask"); + final TaskStatusResponse taskStatusResponse1 = TestHelper.makeJsonMapper().readValue( + TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()), + TaskStatusResponse.class + ); + Assert.assertEquals(new TaskStatusResponse("mytask", TaskStatus.success("mytask")), taskStatusResponse1); + + final Response response2 = overlordResource.getTaskStatus("othertask"); + final TaskStatusResponse taskStatusResponse2 = TestHelper.makeJsonMapper().readValue( + TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()), + TaskStatusResponse.class + ); + Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2); + } + @After public void tearDown() { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index 8142185e26e..b73c59bc884 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.discovery.NoopServiceAnnouncer; import io.druid.discovery.DruidLeaderSelector; @@ -56,6 +54,8 @@ import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.Pair; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.server.DruidNode; import io.druid.server.coordinator.CoordinatorOverlordServiceConfig; import io.druid.server.metrics.NoopServiceEmitter; @@ -80,7 +80,6 @@ import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -237,7 +236,7 @@ public class OverlordTest // Task payload for task_0 should be present in taskStorage response = overlordResource.getTaskPayload(taskId_0); - Assert.assertEquals(task_0, ((Map) response.getEntity()).get("payload")); + Assert.assertEquals(task_0, ((TaskPayloadResponse) response.getEntity()).getPayload()); // Task not present in taskStorage - should fail response = overlordResource.getTaskPayload("whatever"); @@ -245,10 +244,10 @@ public class OverlordTest // Task status of the submitted task should be running response = overlordResource.getTaskStatus(taskId_0); - Assert.assertEquals(taskId_0, ((Map) response.getEntity()).get("task")); + Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask()); Assert.assertEquals( TaskStatus.running(taskId_0).getStatusCode(), - ((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode() + ((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode() ); // Simulate completion of task_0 @@ -296,7 +295,7 @@ public class OverlordTest { while (true) { Response response = overlordResource.getTaskStatus(taskId); - if (status.equals(((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode())) { + if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode())) { break; } Thread.sleep(10);