diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilter.java index e834c2e875e..f498788ec52 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilter.java @@ -78,7 +78,7 @@ public class SupervisorResourceFilter extends AbstractResourceFilter Optional supervisorSpecOptional = supervisorManager.getSupervisorSpec(supervisorId); if (!supervisorSpecOptional.isPresent()) { throw new WebApplicationException( - Response.status(Response.Status.BAD_REQUEST) + Response.status(Response.Status.NOT_FOUND) .entity(StringUtils.format("Cannot find any supervisor with id: [%s]", supervisorId)) .build() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java index da1e5c558ee..72ad3bee0a1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java @@ -78,7 +78,7 @@ public class TaskResourceFilter extends AbstractResourceFilter Optional taskOptional = taskStorageQueryAdapter.getTask(taskId); if (!taskOptional.isPresent()) { throw new WebApplicationException( - Response.status(Response.Status.BAD_REQUEST) + Response.status(Response.Status.NOT_FOUND) .entity(StringUtils.format("Cannot find any task with id: [%s]", taskId)) .build() ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java index 21485a84c61..f8935a70e3c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java @@ -38,8 +38,10 @@ import org.junit.Before; import org.junit.Test; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.PathSegment; +import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -121,6 +123,39 @@ public class SupervisorResourceFilterTest verifyMocks(); } + @Test + public void testSupervisorNotFound() + { + String dataSource = "not_exist_data_source"; + expect(containerRequest.getPathSegments()) + .andReturn(getPathSegments("/druid/indexer/v1/supervisor/" + dataSource)) + .anyTimes(); + expect(containerRequest.getMethod()).andReturn("POST").anyTimes(); + + SupervisorSpec supervisorSpec = EasyMock.createMock(SupervisorSpec.class); + expect(supervisorSpec.getDataSources()) + .andReturn(Collections.singletonList(dataSource)) + .anyTimes(); + expect(supervisorManager.getSupervisorSpec(dataSource)) + .andReturn(Optional.absent()) + .atLeastOnce(); + EasyMock.replay(containerRequest); + EasyMock.replay(supervisorManager); + + WebApplicationException expected = null; + try { + resourceFilter.filter(containerRequest); + } + catch (WebApplicationException e) { + expected = e; + } + + Assert.assertNotNull(expected); + Assert.assertEquals(expected.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + EasyMock.verify(containerRequest); + EasyMock.verify(supervisorManager); + } + private void setExpectations( String path, String requestMethod, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilterTest.java new file mode 100644 index 00000000000..59abcaacdf8 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilterTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.http.security; + +import com.google.common.base.Optional; +import com.sun.jersey.spi.container.ContainerRequest; +import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.server.security.AuthorizerMapper; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.PathSegment; +import javax.ws.rs.core.Response; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.easymock.EasyMock.expect; + +public class TaskResourceFilterTest +{ + private AuthorizerMapper authorizerMapper; + private TaskStorageQueryAdapter taskStorageQueryAdapter; + private ContainerRequest containerRequest; + private TaskResourceFilter resourceFilter; + + @Before + public void setup() + { + authorizerMapper = EasyMock.createMock(AuthorizerMapper.class); + taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class); + containerRequest = EasyMock.createMock(ContainerRequest.class); + resourceFilter = new TaskResourceFilter(taskStorageQueryAdapter, authorizerMapper); + } + + @Test + public void testTaskNotFound() + { + String taskId = "not_exist_task_id"; + expect(containerRequest.getPathSegments()) + .andReturn(getPathSegments("/task/" + taskId)) + .anyTimes(); + expect(containerRequest.getMethod()).andReturn("POST").anyTimes(); + + SupervisorSpec supervisorSpec = EasyMock.createMock(SupervisorSpec.class); + expect(supervisorSpec.getDataSources()) + .andReturn(Collections.singletonList(taskId)) + .anyTimes(); + expect(taskStorageQueryAdapter.getTask(taskId)) + .andReturn(Optional.absent()) + .atLeastOnce(); + EasyMock.replay(containerRequest); + EasyMock.replay(taskStorageQueryAdapter); + + WebApplicationException expected = null; + try { + resourceFilter.filter(containerRequest); + } + catch (WebApplicationException e) { + expected = e; + } + Assert.assertNotNull(expected); + Assert.assertEquals(expected.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + EasyMock.verify(containerRequest); + EasyMock.verify(taskStorageQueryAdapter); + } + + private List getPathSegments(String path) + { + String[] segments = path.split("/"); + + List pathSegments = new ArrayList<>(); + for (final String segment : segments) { + pathSegments.add(new PathSegment() + { + @Override + public String getPath() + { + return segment; + } + + @Override + public MultivaluedMap getMatrixParameters() + { + return null; + } + }); + } + return pathSegments; + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index dc250397d68..a4932d1d533 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.overlord.http.TaskPayloadResponse; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; @@ -126,7 +127,6 @@ public class OverlordResourceTestClient StringUtils.urlEncode(taskID) ) ); - LOG.info("Index status response" + response.getContent()); TaskStatusResponse taskStatusResponse = jsonMapper.readValue( response.getContent(), @@ -136,6 +136,9 @@ public class OverlordResourceTestClient ); return taskStatusResponse.getStatus(); } + catch (ISE e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } @@ -189,6 +192,28 @@ public class OverlordResourceTestClient } } + public TaskPayloadResponse getTaskPayload(String taskId) + { + try { + StatusResponseHolder response = makeRequest( + HttpMethod.GET, + StringUtils.format("%stask/%s", getIndexerURL(), taskId) + ); + LOG.info("Task %s response %s", taskId, response.getContent()); + return jsonMapper.readValue( + response.getContent(), new TypeReference() + { + } + ); + } + catch (ISE e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + public String getTaskLog(String taskId) { return getTaskLog(taskId, -88000); @@ -203,6 +228,9 @@ public class OverlordResourceTestClient ); return response.getContent(); } + catch (ISE e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } @@ -242,6 +270,9 @@ public class OverlordResourceTestClient } ); } + catch (ISE e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } @@ -357,6 +388,37 @@ public class OverlordResourceTestClient } } + public void shutdownSupervisor(String id) + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.POST, + new URL(StringUtils.format( + "%ssupervisor/%s/shutdown", + getIndexerURL(), + StringUtils.urlEncode(id) + )) + ), + StatusResponseHandler.getInstance() + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while shutting down supervisor, response [%s %s]", + response.getStatus(), + response.getContent() + ); + } + LOG.info("Shutdown supervisor with id[%s]", id); + } + catch (ISE e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + public void terminateSupervisor(String id) { try { @@ -380,6 +442,9 @@ public class OverlordResourceTestClient } LOG.info("Terminate supervisor with id[%s]", id); } + catch (ISE e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } @@ -408,6 +473,9 @@ public class OverlordResourceTestClient } LOG.info("Shutdown task with id[%s]", id); } + catch (ISE e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } @@ -446,6 +514,9 @@ public class OverlordResourceTestClient LOG.info("Supervisor id[%s] has state [%s]", id, state); return SupervisorStateManager.BasicState.valueOf(state); } + catch (ISE e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } @@ -474,6 +545,71 @@ public class OverlordResourceTestClient } LOG.info("Suspended supervisor with id[%s]", id); } + catch (ISE e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void statsSupervisor(String id) + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(StringUtils.format( + "%ssupervisor/%s/stats", + getIndexerURL(), + StringUtils.urlEncode(id) + )) + ), + StatusResponseHandler.getInstance() + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while stats supervisor, response [%s %s]", + response.getStatus(), + response.getContent() + ); + } + LOG.info("stats supervisor with id[%s]", id); + } + catch (ISE e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void getSupervisorHealth(String id) + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(StringUtils.format( + "%ssupervisor/%s/health", + getIndexerURL(), + StringUtils.urlEncode(id) + )) + ), + StatusResponseHandler.getInstance() + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while get supervisor health, response [%s %s]", + response.getStatus(), + response.getContent() + ); + } + LOG.info("get supervisor health with id[%s]", id); + } + catch (ISE e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } @@ -502,6 +638,40 @@ public class OverlordResourceTestClient } LOG.info("Resumed supervisor with id[%s]", id); } + catch (ISE e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void resetSupervisor(String id) + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.POST, + new URL(StringUtils.format( + "%ssupervisor/%s/reset", + getIndexerURL(), + StringUtils.urlEncode(id) + )) + ), + StatusResponseHandler.getInstance() + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while resetting supervisor, response [%s %s]", + response.getStatus(), + response.getContent() + ); + } + LOG.info("Reset supervisor with id[%s]", id); + } + catch (ISE e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } @@ -521,9 +691,7 @@ public class OverlordResourceTestClient ), StatusResponseHandler.getInstance() ).get(); - if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) { - return null; - } else if (!response.getStatus().equals(HttpResponseStatus.OK)) { + if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( "Error while getting supervisor status, response [%s %s]", response.getStatus(), @@ -537,6 +705,9 @@ public class OverlordResourceTestClient ); return responseData; } + catch (ISE e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } @@ -552,6 +723,10 @@ public class OverlordResourceTestClient } return response; } + catch (ISE e) { + LOG.error("Exception while sending request: %s", e.getMessage()); + throw e; + } catch (Exception e) { LOG.error(e, "Exception while sending request"); throw new RuntimeException(e); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index 506ce610226..aeb8a010efd 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -159,4 +159,6 @@ public class TestNGGroup public static final String SHUFFLE_DEEP_STORE = "shuffle-deep-store"; public static final String CUSTOM_COORDINATOR_DUTIES = "custom-coordinator-duties"; + + public static final String HTTP_ENDPOINT = "http-endpoint"; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index c118069a8fe..5ce87289752 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -25,6 +25,7 @@ import com.google.inject.Inject; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; @@ -403,7 +404,19 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest // Verify that auto cleanup eventually removes supervisor spec after termination ITRetryUtil.retryUntil( - () -> indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId()) == null, + () -> { + try { + indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId()); + LOG.warn("Supervisor history should not exist"); + return false; + } + catch (ISE e) { + if (e.getMessage().contains("Not Found")) { + return true; + } + throw e; + } + }, true, 10000, 30, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITOverlordResourceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITOverlordResourceTest.java new file mode 100644 index 00000000000..5db3749fe8b --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITOverlordResourceTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.query; + +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.testing.clients.OverlordResourceTestClient; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.testng.Assert; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.util.function.Consumer; + +@Test(groups = TestNGGroup.HTTP_ENDPOINT) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITOverlordResourceTest +{ + @Inject + protected OverlordResourceTestClient indexer; + + @Test + public void testGetSupervisorStatusNotFound() + { + callAndCheckNotFound(indexer::getSupervisorStatus); + } + + @Test + public void testGetSupervisorHistoryNotFound() + { + callAndCheckNotFound(indexer::getSupervisorHistory); + } + + @Test + public void testResumeSupervisorNotFound() + { + callAndCheckNotFound(indexer::resumeSupervisor); + } + + @Test + public void testSuspendSupervisorNotFound() + { + callAndCheckNotFound(indexer::suspendSupervisor); + } + + @Test + public void testShutdownSupervisorNotFound() + { + callAndCheckNotFound(indexer::shutdownSupervisor); + } + + @Test + public void testTerminateSupervisorNotFound() + { + callAndCheckNotFound(indexer::terminateSupervisor); + } + + @Test + public void testGetSupervisorHealthNotFound() + { + callAndCheckNotFound(indexer::getSupervisorHealth); + } + + @Test + public void testStatsSupervisorNotFound() + { + callAndCheckNotFound(indexer::statsSupervisor); + } + + @Test + public void testResetSupervisorNotFound() + { + callAndCheckNotFound(indexer::resetSupervisor); + } + + @Test + public void testGetTaskStatusNotFound() + { + callAndCheckNotFound(indexer::getTaskStatus); + } + + @Test + public void testShutdownTaskNotFound() + { + callAndCheckNotFound(indexer::shutdownTask); + } + + @Test + public void testGetTaskLogNotFound() + { + callAndCheckNotFound(indexer::getTaskLog); + } + + @Test + public void testGetTaskReportNotFound() + { + callAndCheckNotFound(indexer::getTaskReport); + } + + @Test + public void testGetTaskPayLoadNotFound() + { + callAndCheckNotFound(indexer::getTaskPayload); + } + + private void callAndCheckNotFound(Consumer runnable) + { + String supervisorId = "not_exist_id"; + try { + runnable.accept(supervisorId); + } + catch (ISE e) { + // OverlordResourceTestClient turns all non-200 response into ISE exception + // So we catch ISE and check if the message in this exception matches expected message + Assert.assertTrue( + e.getMessage().contains("[404 Not Found") && e.getMessage().contains(supervisorId), + "Unexpected exception. Message does not match expected. " + e.getMessage() + ); + return; + } + Assert.fail("Should not go to here"); + } +}