Fix missing task type in task payload API. (#5399)

* Fix missing task type in task payload API.

Apparently embedding a polymorphic object inside a Map<String, Object> is
a bit too much for Jackson to serialize properly. Fix this by using
wrapper classes.

* Fix OverlordTest casts.

* Remove import.

* Remove unused imports.

* Clarify comments.
This commit is contained in:
Gian Merlino 2018-02-21 10:00:13 -08:00 committed by Fangjin Yang
parent 818ce51964
commit 3f72537787
5 changed files with 272 additions and 32 deletions

View File

@ -172,7 +172,12 @@ public class OverlordResource
}
catch (EntryExistsException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", StringUtils.format("Task[%s] already exists!", task.getId())))
.entity(
ImmutableMap.of(
"error",
StringUtils.format("Task[%s] already exists!", task.getId())
)
)
.build();
}
}
@ -212,7 +217,16 @@ public class OverlordResource
@ResourceFilters(TaskResourceFilter.class)
public Response getTaskPayload(@PathParam("taskid") String taskid)
{
return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid));
final TaskPayloadResponse response = new TaskPayloadResponse(
taskid,
taskStorageQueryAdapter.getTask(taskid).orNull()
);
final Response.Status status = response.getPayload() == null
? Response.Status.NOT_FOUND
: Response.Status.OK;
return Response.status(status).entity(response).build();
}
@GET
@ -221,7 +235,16 @@ public class OverlordResource
@ResourceFilters(TaskResourceFilter.class)
public Response getTaskStatus(@PathParam("taskid") String taskid)
{
return optionalTaskResponse(taskid, "status", taskStorageQueryAdapter.getStatus(taskid));
final TaskStatusResponse response = new TaskStatusResponse(
taskid,
taskStorageQueryAdapter.getStatus(taskid).orNull()
);
final Response.Status status = response.getStatus() == null
? Response.Status.NOT_FOUND
: Response.Status.OK;
return Response.status(status).entity(response).build();
}
@GET
@ -575,14 +598,15 @@ public class OverlordResource
final List<TaskStatusPlus> completeTasks = recentlyFinishedTasks
.stream()
.map(status -> new TaskStatusPlus(
status.getId(),
taskFunction.apply(status.getId()).getType(),
taskStorageQueryAdapter.getCreatedTime(status.getId()),
// Would be nice to include the real queue insertion time, but the TaskStorage API doesn't yet allow it.
DateTimes.EPOCH,
status.getStatusCode(),
status.getDuration(),
TaskLocation.unknown())
status.getId(),
taskFunction.apply(status.getId()).getType(),
taskStorageQueryAdapter.getCreatedTime(status.getId()),
// Would be nice to include the real queue insertion time, but the TaskStorage API doesn't yet allow it.
DateTimes.EPOCH,
status.getStatusCode(),
status.getDuration(),
TaskLocation.unknown()
)
)
.collect(Collectors.toList());
@ -729,18 +753,6 @@ public class OverlordResource
);
}
private <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x)
{
final Map<String, Object> results = Maps.newHashMap();
results.put("task", taskid);
if (x.isPresent()) {
results.put(objectType, x.get());
return Response.status(Response.Status.OK).entity(results).build();
} else {
return Response.status(Response.Status.NOT_FOUND).entity(results).build();
}
}
private <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
{
if (x.isPresent()) {

View File

@ -0,0 +1,83 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.common.task.Task;
import java.util.Objects;
public class TaskPayloadResponse
{
private final String task; // Task ID, named "task" in the JSONification of this class.
private final Task payload;
@JsonCreator
public TaskPayloadResponse(
@JsonProperty("task") final String task,
@JsonProperty("payload") final Task payload
)
{
this.task = task;
this.payload = payload;
}
@JsonProperty
public String getTask()
{
return task;
}
@JsonProperty
public Task getPayload()
{
return payload;
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TaskPayloadResponse that = (TaskPayloadResponse) o;
return Objects.equals(task, that.task) &&
Objects.equals(payload, that.payload);
}
@Override
public int hashCode()
{
return Objects.hash(task, payload);
}
@Override
public String toString()
{
return "TaskPayloadResponse{" +
"task='" + task + '\'' +
", payload=" + payload +
'}';
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.common.TaskStatus;
import java.util.Objects;
public class TaskStatusResponse
{
private final String task; // Task ID, named "task" in the JSONification of this class.
private final TaskStatus status;
@JsonCreator
public TaskStatusResponse(
@JsonProperty("task") final String task,
@JsonProperty("status") final TaskStatus status
)
{
this.task = task;
this.status = status;
}
@JsonProperty
public String getTask()
{
return task;
}
@JsonProperty
public TaskStatus getStatus()
{
return status;
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TaskStatusResponse that = (TaskStatusResponse) o;
return Objects.equals(task, that.task) &&
Objects.equals(status, that.status);
}
@Override
public int hashCode()
{
return Objects.hash(task, status);
}
@Override
public String toString()
{
return "TaskstatusResponse{" +
"task='" + task + '\'' +
", status=" + status +
'}';
}
}

View File

@ -39,6 +39,7 @@ import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.java.util.common.DateTimes;
import io.druid.segment.TestHelper;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
import io.druid.server.security.AuthConfig;
@ -87,7 +88,8 @@ public class OverlordResourceTest
Optional.of(taskRunner)
).anyTimes();
AuthorizerMapper authMapper = new AuthorizerMapper(null) {
AuthorizerMapper authMapper = new AuthorizerMapper(null)
{
@Override
public Authorizer getAuthorizer(String name)
{
@ -289,7 +291,12 @@ public class OverlordResourceTest
EasyMock.expect(taskMaster.isLeader()).andReturn(true);
EasyMock
.expect(indexerMetadataStorageAdapter.deletePendingSegments(EasyMock.eq("allow"), EasyMock.anyObject(Interval.class)))
.expect(
indexerMetadataStorageAdapter.deletePendingSegments(
EasyMock.eq("allow"),
EasyMock.anyObject(Interval.class)
)
)
.andReturn(2);
EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
@ -300,6 +307,61 @@ public class OverlordResourceTest
Assert.assertEquals(2, response.get("numDeleted").intValue());
}
@Test
public void testGetTaskPayload() throws Exception
{
expectAuthorizationTokenCheck();
final NoopTask task = NoopTask.create("mydatasource");
EasyMock.expect(taskStorageQueryAdapter.getTask("mytask"))
.andReturn(Optional.of(task));
EasyMock.expect(taskStorageQueryAdapter.getTask("othertask"))
.andReturn(Optional.absent());
EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
final Response response1 = overlordResource.getTaskPayload("mytask");
final TaskPayloadResponse taskPayloadResponse1 = TestHelper.makeJsonMapper().readValue(
TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
TaskPayloadResponse.class
);
Assert.assertEquals(new TaskPayloadResponse("mytask", task), taskPayloadResponse1);
final Response response2 = overlordResource.getTaskPayload("othertask");
final TaskPayloadResponse taskPayloadResponse2 = TestHelper.makeJsonMapper().readValue(
TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()),
TaskPayloadResponse.class
);
Assert.assertEquals(new TaskPayloadResponse("othertask", null), taskPayloadResponse2);
}
@Test
public void testGetTaskStatus() throws Exception
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getStatus("mytask"))
.andReturn(Optional.of(TaskStatus.success("mytask")));
EasyMock.expect(taskStorageQueryAdapter.getStatus("othertask"))
.andReturn(Optional.absent());
EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
final Response response1 = overlordResource.getTaskStatus("mytask");
final TaskStatusResponse taskStatusResponse1 = TestHelper.makeJsonMapper().readValue(
TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
TaskStatusResponse.class
);
Assert.assertEquals(new TaskStatusResponse("mytask", TaskStatus.success("mytask")), taskStatusResponse1);
final Response response2 = overlordResource.getTaskStatus("othertask");
final TaskStatusResponse taskStatusResponse2 = TestHelper.makeJsonMapper().readValue(
TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()),
TaskStatusResponse.class
);
Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2);
}
@After
public void tearDown()
{

View File

@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.discovery.DruidLeaderSelector;
@ -56,6 +54,8 @@ import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.metrics.NoopServiceEmitter;
@ -80,7 +80,6 @@ import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@ -237,7 +236,7 @@ public class OverlordTest
// Task payload for task_0 should be present in taskStorage
response = overlordResource.getTaskPayload(taskId_0);
Assert.assertEquals(task_0, ((Map) response.getEntity()).get("payload"));
Assert.assertEquals(task_0, ((TaskPayloadResponse) response.getEntity()).getPayload());
// Task not present in taskStorage - should fail
response = overlordResource.getTaskPayload("whatever");
@ -245,10 +244,10 @@ public class OverlordTest
// Task status of the submitted task should be running
response = overlordResource.getTaskStatus(taskId_0);
Assert.assertEquals(taskId_0, ((Map) response.getEntity()).get("task"));
Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask());
Assert.assertEquals(
TaskStatus.running(taskId_0).getStatusCode(),
((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode()
((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode()
);
// Simulate completion of task_0
@ -296,7 +295,7 @@ public class OverlordTest
{
while (true) {
Response response = overlordResource.getTaskStatus(taskId);
if (status.equals(((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode())) {
if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode())) {
break;
}
Thread.sleep(10);