Return HTTP 404 instead of 400 for supervisor/task endpoints (#11724)

* Use 404 instead of 400

* Use 404 instead of 400

* Add UT test cases

* Add IT testcases

* add UT for task resource filter

Signed-off-by: frank chen <frank.chen021@outlook.com>

* Using org.testing.Assert instead of org.junit.Assert

* Resolve comments and fix test

* Fix test

* Fix tests

* Resolve comments
This commit is contained in:
Frank Chen 2021-11-25 13:09:47 +08:00 committed by GitHub
parent 2c08055962
commit 98957be044
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 486 additions and 7 deletions

View File

@ -78,7 +78,7 @@ public class SupervisorResourceFilter extends AbstractResourceFilter
Optional<SupervisorSpec> supervisorSpecOptional = supervisorManager.getSupervisorSpec(supervisorId);
if (!supervisorSpecOptional.isPresent()) {
throw new WebApplicationException(
Response.status(Response.Status.BAD_REQUEST)
Response.status(Response.Status.NOT_FOUND)
.entity(StringUtils.format("Cannot find any supervisor with id: [%s]", supervisorId))
.build()
);

View File

@ -78,7 +78,7 @@ public class TaskResourceFilter extends AbstractResourceFilter
Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId);
if (!taskOptional.isPresent()) {
throw new WebApplicationException(
Response.status(Response.Status.BAD_REQUEST)
Response.status(Response.Status.NOT_FOUND)
.entity(StringUtils.format("Cannot find any task with id: [%s]", taskId))
.build()
);

View File

@ -38,8 +38,10 @@ import org.junit.Before;
import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.PathSegment;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -121,6 +123,39 @@ public class SupervisorResourceFilterTest
verifyMocks();
}
@Test
public void testSupervisorNotFound()
{
String dataSource = "not_exist_data_source";
expect(containerRequest.getPathSegments())
.andReturn(getPathSegments("/druid/indexer/v1/supervisor/" + dataSource))
.anyTimes();
expect(containerRequest.getMethod()).andReturn("POST").anyTimes();
SupervisorSpec supervisorSpec = EasyMock.createMock(SupervisorSpec.class);
expect(supervisorSpec.getDataSources())
.andReturn(Collections.singletonList(dataSource))
.anyTimes();
expect(supervisorManager.getSupervisorSpec(dataSource))
.andReturn(Optional.absent())
.atLeastOnce();
EasyMock.replay(containerRequest);
EasyMock.replay(supervisorManager);
WebApplicationException expected = null;
try {
resourceFilter.filter(containerRequest);
}
catch (WebApplicationException e) {
expected = e;
}
Assert.assertNotNull(expected);
Assert.assertEquals(expected.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
EasyMock.verify(containerRequest);
EasyMock.verify(supervisorManager);
}
private void setExpectations(
String path,
String requestMethod,

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.http.security;
import com.google.common.base.Optional;
import com.sun.jersey.spi.container.ContainerRequest;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.PathSegment;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.easymock.EasyMock.expect;
public class TaskResourceFilterTest
{
private AuthorizerMapper authorizerMapper;
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private ContainerRequest containerRequest;
private TaskResourceFilter resourceFilter;
@Before
public void setup()
{
authorizerMapper = EasyMock.createMock(AuthorizerMapper.class);
taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class);
containerRequest = EasyMock.createMock(ContainerRequest.class);
resourceFilter = new TaskResourceFilter(taskStorageQueryAdapter, authorizerMapper);
}
@Test
public void testTaskNotFound()
{
String taskId = "not_exist_task_id";
expect(containerRequest.getPathSegments())
.andReturn(getPathSegments("/task/" + taskId))
.anyTimes();
expect(containerRequest.getMethod()).andReturn("POST").anyTimes();
SupervisorSpec supervisorSpec = EasyMock.createMock(SupervisorSpec.class);
expect(supervisorSpec.getDataSources())
.andReturn(Collections.singletonList(taskId))
.anyTimes();
expect(taskStorageQueryAdapter.getTask(taskId))
.andReturn(Optional.absent())
.atLeastOnce();
EasyMock.replay(containerRequest);
EasyMock.replay(taskStorageQueryAdapter);
WebApplicationException expected = null;
try {
resourceFilter.filter(containerRequest);
}
catch (WebApplicationException e) {
expected = e;
}
Assert.assertNotNull(expected);
Assert.assertEquals(expected.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
EasyMock.verify(containerRequest);
EasyMock.verify(taskStorageQueryAdapter);
}
private List<PathSegment> getPathSegments(String path)
{
String[] segments = path.split("/");
List<PathSegment> pathSegments = new ArrayList<>();
for (final String segment : segments) {
pathSegments.add(new PathSegment()
{
@Override
public String getPath()
{
return segment;
}
@Override
public MultivaluedMap<String, String> getMatrixParameters()
{
return null;
}
});
}
return pathSegments;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
@ -126,7 +127,6 @@ public class OverlordResourceTestClient
StringUtils.urlEncode(taskID)
)
);
LOG.info("Index status response" + response.getContent());
TaskStatusResponse taskStatusResponse = jsonMapper.readValue(
response.getContent(),
@ -136,6 +136,9 @@ public class OverlordResourceTestClient
);
return taskStatusResponse.getStatus();
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
@ -189,6 +192,28 @@ public class OverlordResourceTestClient
}
}
public TaskPayloadResponse getTaskPayload(String taskId)
{
try {
StatusResponseHolder response = makeRequest(
HttpMethod.GET,
StringUtils.format("%stask/%s", getIndexerURL(), taskId)
);
LOG.info("Task %s response %s", taskId, response.getContent());
return jsonMapper.readValue(
response.getContent(), new TypeReference<TaskPayloadResponse>()
{
}
);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public String getTaskLog(String taskId)
{
return getTaskLog(taskId, -88000);
@ -203,6 +228,9 @@ public class OverlordResourceTestClient
);
return response.getContent();
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
@ -242,6 +270,9 @@ public class OverlordResourceTestClient
}
);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
@ -357,6 +388,37 @@ public class OverlordResourceTestClient
}
}
public void shutdownSupervisor(String id)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.POST,
new URL(StringUtils.format(
"%ssupervisor/%s/shutdown",
getIndexerURL(),
StringUtils.urlEncode(id)
))
),
StatusResponseHandler.getInstance()
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while shutting down supervisor, response [%s %s]",
response.getStatus(),
response.getContent()
);
}
LOG.info("Shutdown supervisor with id[%s]", id);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public void terminateSupervisor(String id)
{
try {
@ -380,6 +442,9 @@ public class OverlordResourceTestClient
}
LOG.info("Terminate supervisor with id[%s]", id);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
@ -408,6 +473,9 @@ public class OverlordResourceTestClient
}
LOG.info("Shutdown task with id[%s]", id);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
@ -446,6 +514,9 @@ public class OverlordResourceTestClient
LOG.info("Supervisor id[%s] has state [%s]", id, state);
return SupervisorStateManager.BasicState.valueOf(state);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
@ -474,6 +545,71 @@ public class OverlordResourceTestClient
}
LOG.info("Suspended supervisor with id[%s]", id);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public void statsSupervisor(String id)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.GET,
new URL(StringUtils.format(
"%ssupervisor/%s/stats",
getIndexerURL(),
StringUtils.urlEncode(id)
))
),
StatusResponseHandler.getInstance()
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while stats supervisor, response [%s %s]",
response.getStatus(),
response.getContent()
);
}
LOG.info("stats supervisor with id[%s]", id);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public void getSupervisorHealth(String id)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.GET,
new URL(StringUtils.format(
"%ssupervisor/%s/health",
getIndexerURL(),
StringUtils.urlEncode(id)
))
),
StatusResponseHandler.getInstance()
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while get supervisor health, response [%s %s]",
response.getStatus(),
response.getContent()
);
}
LOG.info("get supervisor health with id[%s]", id);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
@ -502,6 +638,40 @@ public class OverlordResourceTestClient
}
LOG.info("Resumed supervisor with id[%s]", id);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public void resetSupervisor(String id)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.POST,
new URL(StringUtils.format(
"%ssupervisor/%s/reset",
getIndexerURL(),
StringUtils.urlEncode(id)
))
),
StatusResponseHandler.getInstance()
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while resetting supervisor, response [%s %s]",
response.getStatus(),
response.getContent()
);
}
LOG.info("Reset supervisor with id[%s]", id);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
@ -521,9 +691,7 @@ public class OverlordResourceTestClient
),
StatusResponseHandler.getInstance()
).get();
if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
return null;
} else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting supervisor status, response [%s %s]",
response.getStatus(),
@ -537,6 +705,9 @@ public class OverlordResourceTestClient
);
return responseData;
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
@ -552,6 +723,10 @@ public class OverlordResourceTestClient
}
return response;
}
catch (ISE e) {
LOG.error("Exception while sending request: %s", e.getMessage());
throw e;
}
catch (Exception e) {
LOG.error(e, "Exception while sending request");
throw new RuntimeException(e);

View File

@ -159,4 +159,6 @@ public class TestNGGroup
public static final String SHUFFLE_DEEP_STORE = "shuffle-deep-store";
public static final String CUSTOM_COORDINATOR_DUTIES = "custom-coordinator-duties";
public static final String HTTP_ENDPOINT = "http-endpoint";
}

View File

@ -25,6 +25,7 @@ import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
@ -403,7 +404,19 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Verify that auto cleanup eventually removes supervisor spec after termination
ITRetryUtil.retryUntil(
() -> indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId()) == null,
() -> {
try {
indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId());
LOG.warn("Supervisor history should not exist");
return false;
}
catch (ISE e) {
if (e.getMessage().contains("Not Found")) {
return true;
}
throw e;
}
},
true,
10000,
30,

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.query;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.function.Consumer;
@Test(groups = TestNGGroup.HTTP_ENDPOINT)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITOverlordResourceTest
{
@Inject
protected OverlordResourceTestClient indexer;
@Test
public void testGetSupervisorStatusNotFound()
{
callAndCheckNotFound(indexer::getSupervisorStatus);
}
@Test
public void testGetSupervisorHistoryNotFound()
{
callAndCheckNotFound(indexer::getSupervisorHistory);
}
@Test
public void testResumeSupervisorNotFound()
{
callAndCheckNotFound(indexer::resumeSupervisor);
}
@Test
public void testSuspendSupervisorNotFound()
{
callAndCheckNotFound(indexer::suspendSupervisor);
}
@Test
public void testShutdownSupervisorNotFound()
{
callAndCheckNotFound(indexer::shutdownSupervisor);
}
@Test
public void testTerminateSupervisorNotFound()
{
callAndCheckNotFound(indexer::terminateSupervisor);
}
@Test
public void testGetSupervisorHealthNotFound()
{
callAndCheckNotFound(indexer::getSupervisorHealth);
}
@Test
public void testStatsSupervisorNotFound()
{
callAndCheckNotFound(indexer::statsSupervisor);
}
@Test
public void testResetSupervisorNotFound()
{
callAndCheckNotFound(indexer::resetSupervisor);
}
@Test
public void testGetTaskStatusNotFound()
{
callAndCheckNotFound(indexer::getTaskStatus);
}
@Test
public void testShutdownTaskNotFound()
{
callAndCheckNotFound(indexer::shutdownTask);
}
@Test
public void testGetTaskLogNotFound()
{
callAndCheckNotFound(indexer::getTaskLog);
}
@Test
public void testGetTaskReportNotFound()
{
callAndCheckNotFound(indexer::getTaskReport);
}
@Test
public void testGetTaskPayLoadNotFound()
{
callAndCheckNotFound(indexer::getTaskPayload);
}
private void callAndCheckNotFound(Consumer<String> runnable)
{
String supervisorId = "not_exist_id";
try {
runnable.accept(supervisorId);
}
catch (ISE e) {
// OverlordResourceTestClient turns all non-200 response into ISE exception
// So we catch ISE and check if the message in this exception matches expected message
Assert.assertTrue(
e.getMessage().contains("[404 Not Found") && e.getMessage().contains(supervisorId),
"Unexpected exception. Message does not match expected. " + e.getMessage()
);
return;
}
Assert.fail("Should not go to here");
}
}