diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 35ad2768158..60f9f3db947 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -33,6 +33,7 @@ import org.apache.druid.error.DruidException; import org.apache.druid.error.ErrorResponse; import org.apache.druid.error.Forbidden; import org.apache.druid.error.InvalidInput; +import org.apache.druid.error.NotFound; import org.apache.druid.error.QueryExceptionCompat; import org.apache.druid.frame.channel.FrameChannelSequence; import org.apache.druid.guice.annotations.MSQ; @@ -68,6 +69,7 @@ import org.apache.druid.query.ExecutionMode; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryException; +import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.QueryResponse; import org.apache.druid.server.security.Access; @@ -84,6 +86,7 @@ import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.sql.http.SqlResource; import org.apache.druid.storage.NilStorageConnector; import org.apache.druid.storage.StorageConnector; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -251,7 +254,7 @@ public class SqlStatementResource if (sqlStatementResult.isPresent()) { return Response.ok().entity(sqlStatementResult.get()).build(); } else { - return Response.status(Response.Status.NOT_FOUND).build(); + throw queryNotFoundException(queryId); } } catch (DruidException e) { @@ -297,14 +300,14 @@ public class SqlStatementResource ); } - TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId)); + TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId), queryId); if (taskResponse == null) { - return Response.status(Response.Status.NOT_FOUND).build(); + throw queryNotFoundException(queryId); } TaskStatusPlus statusPlus = taskResponse.getStatus(); if (statusPlus == null || !MSQControllerTask.TYPE.equals(statusPlus.getType())) { - return Response.status(Response.Status.NOT_FOUND).build(); + throw queryNotFoundException(queryId); } MSQControllerTask msqControllerTask = getMSQControllerTaskOrThrow(queryId, authenticationResult.getIdentity()); @@ -393,9 +396,8 @@ public class SqlStatementResource default: throw new ISE("Illegal State[%s] encountered", sqlStatementResult.get().getState()); } - } else { - return Response.status(Response.Status.NOT_FOUND).build(); + throw queryNotFoundException(queryId); } } catch (DruidException e) { @@ -522,8 +524,11 @@ public class SqlStatementResource ) { if (sqlStatementState == SqlStatementState.SUCCESS) { - Map payload = SqlStatementResourceHelper.getPayload(contactOverlord(overlordClient.taskReportAsMap( - queryId))); + Map payload = + SqlStatementResourceHelper.getPayload(contactOverlord( + overlordClient.taskReportAsMap(queryId), + queryId + )); MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(payload, MSQTaskReportPayload.class); Optional> pageList = SqlStatementResourceHelper.populatePageList( msqTaskReportPayload, @@ -590,7 +595,7 @@ public class SqlStatementResource private Optional getStatementStatus(String queryId, String currentUser, boolean withResults) throws DruidException { - TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId)); + TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId), queryId); if (taskResponse == null) { return Optional.empty(); } @@ -610,8 +615,7 @@ public class SqlStatementResource taskResponse, statusPlus, sqlStatementState, - contactOverlord(overlordClient.taskReportAsMap( - queryId)) + contactOverlord(overlordClient.taskReportAsMap(queryId), queryId) ); } else { Optional> signature = SqlStatementResourceHelper.getSignature(msqControllerTask); @@ -635,7 +639,7 @@ public class SqlStatementResource private MSQControllerTask getMSQControllerTaskOrThrow(String queryId, String currentUser) { - TaskPayloadResponse taskPayloadResponse = contactOverlord(overlordClient.taskPayload(queryId)); + TaskPayloadResponse taskPayloadResponse = contactOverlord(overlordClient.taskPayload(queryId), queryId); SqlStatementResourceHelper.isMSQPayload(taskPayloadResponse, queryId); MSQControllerTask msqControllerTask = (MSQControllerTask) taskPayloadResponse.getPayload(); @@ -672,7 +676,7 @@ public class SqlStatementResource } MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload( - contactOverlord(overlordClient.taskReportAsMap(queryId))), MSQTaskReportPayload.class); + contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)), MSQTaskReportPayload.class); if (msqTaskReportPayload.getResults().getResultYielder() == null) { results = Optional.empty(); @@ -683,7 +687,7 @@ public class SqlStatementResource } else if (msqControllerTask.getQuerySpec().getDestination() instanceof DurableStorageMSQDestination) { MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload( - contactOverlord(overlordClient.taskReportAsMap(queryId))), MSQTaskReportPayload.class); + contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)), MSQTaskReportPayload.class); List pages = SqlStatementResourceHelper.populatePageList( @@ -721,7 +725,8 @@ public class SqlStatementResource return new FrameChannelSequence(standardImplementation.openChannel( finalStage.getId(), (int) pageInformation.getId(), - (int) pageInformation.getId()// we would always have partition number == worker number + (int) pageInformation.getId() +// we would always have partition number == worker number )); } catch (Exception e) { @@ -873,17 +878,30 @@ public class SqlStatementResource } } - private T contactOverlord(final ListenableFuture future) + private T contactOverlord(final ListenableFuture future, String queryId) { try { return FutureUtils.getUnchecked(future, true); } catch (RuntimeException e) { + if (e.getCause() instanceof HttpResponseException) { + HttpResponseException httpResponseException = (HttpResponseException) e.getCause(); + if (httpResponseException.getResponse() != null && httpResponseException.getResponse().getResponse().getStatus() + .equals(HttpResponseStatus.NOT_FOUND)) { + log.info(httpResponseException, "Query details not found for queryId [%s]", queryId); + // since we get a 404, we mark the request as a NotFound. This code path is generally triggered when user passes a `queryId` which is not found in the overlord. + throw queryNotFoundException(queryId); + } + } throw DruidException.forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.UNCATEGORIZED) .build("Unable to contact overlord " + e.getMessage()); } } + private static DruidException queryNotFoundException(String queryId) + { + return NotFound.exception("Query [%s] was not found. The query details are no longer present or might not be of the type [%s]. Verify that the id is correct.", queryId, MSQControllerTask.TYPE); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java index a498aad60af..08bc3dc54d9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java @@ -26,6 +26,7 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.error.DruidException; +import org.apache.druid.error.NotFound; import org.apache.druid.frame.Frame; import org.apache.druid.frame.processor.FrameProcessors; import org.apache.druid.indexer.TaskLocation; @@ -104,17 +105,11 @@ public class SqlStatementResourceHelper public static void isMSQPayload(TaskPayloadResponse taskPayloadResponse, String queryId) throws DruidException { if (taskPayloadResponse == null || taskPayloadResponse.getPayload() == null) { - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build( - "Query[%s] not found", queryId); + throw NotFound.exception("Query[%s] not found", queryId); } if (MSQControllerTask.class != taskPayloadResponse.getPayload().getClass()) { - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build( - "Query[%s] not found", queryId); + throw NotFound.exception("Query[%s] not found", queryId); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java index aab89bb4287..0315eaa874b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskStatusResponse; @@ -40,6 +41,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.CounterSnapshots; import org.apache.druid.msq.counters.CounterSnapshotsTree; @@ -65,6 +67,7 @@ import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnType; @@ -77,6 +80,9 @@ import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.http.SqlResourceTest; import org.apache.druid.storage.local.LocalFileStorageConnector; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Before; @@ -87,6 +93,7 @@ import org.mockito.Mockito; import javax.ws.rs.core.Response; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -249,11 +256,12 @@ public class SqlStatementResourceTest extends MSQTestBase 0, new CounterSnapshots(ImmutableMap.of( "output", - new ChannelCounters.Snapshot(new long[]{1L, 2L}, - new long[]{3L, 5L}, - new long[]{}, - new long[]{}, - new long[]{} + new ChannelCounters.Snapshot( + new long[]{1L, 2L}, + new long[]{3L, 5L}, + new long[]{}, + new long[]{}, + new long[]{} ) ) ) @@ -587,10 +595,9 @@ public class SqlStatementResourceTest extends MSQTestBase } - public static void assertNullResponse(Response response, Response.Status expectectedStatus) + public static void assertNotFound(Response response, String queryId) { - Assert.assertEquals(expectectedStatus.getStatusCode(), response.getStatus()); - Assert.assertNull(response.getEntity()); + assertExceptionMessage(response, StringUtils.format("Query [%s] was not found. The query details are no longer present or might not be of the type [%s]. Verify that the id is correct.", queryId, MSQControllerTask.TYPE), Response.Status.NOT_FOUND); } public static void assertExceptionMessage( @@ -830,9 +837,9 @@ public class SqlStatementResourceTest extends MSQTestBase public void testNonMSQTasks() { for (String queryID : ImmutableList.of(RUNNING_NON_MSQ_TASK, FAILED_NON_MSQ_TASK, FINISHED_NON_MSQ_TASK)) { - assertNullResponse(resource.doGetStatus(queryID, makeOkRequest()), Response.Status.NOT_FOUND); - assertNullResponse(resource.doGetResults(queryID, 0L, makeOkRequest()), Response.Status.NOT_FOUND); - assertNullResponse(resource.deleteQuery(queryID, makeOkRequest()), Response.Status.NOT_FOUND); + assertNotFound(resource.doGetStatus(queryID, makeOkRequest()), queryID); + assertNotFound(resource.doGetResults(queryID, 0L, makeOkRequest()), queryID); + assertNotFound(resource.deleteQuery(queryID, makeOkRequest()), queryID); } } @@ -903,7 +910,7 @@ public class SqlStatementResourceTest extends MSQTestBase } @Test - public void forbiddenTests() + public void testForbiddenRequest() { Assert.assertEquals( Response.Status.FORBIDDEN.getStatusCode(), @@ -929,6 +936,33 @@ public class SqlStatementResourceTest extends MSQTestBase ); } + @Test + public void testTaskIdNotFound() + { + String taskIdNotFound = "notFound"; + final DefaultHttpResponse incorrectResponse = + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND); + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.setException(new HttpResponseException(new StringFullResponseHolder( + incorrectResponse, + StandardCharsets.UTF_8 + ))); + Mockito.when(overlordClient.taskStatus(taskIdNotFound)).thenReturn(settableFuture); + + Assert.assertEquals( + Response.Status.NOT_FOUND.getStatusCode(), + resource.doGetStatus(taskIdNotFound, makeOkRequest()).getStatus() + ); + Assert.assertEquals( + Response.Status.NOT_FOUND.getStatusCode(), + resource.doGetResults(taskIdNotFound, null, makeOkRequest()).getStatus() + ); + Assert.assertEquals( + Response.Status.NOT_FOUND.getStatusCode(), + resource.deleteQuery(taskIdNotFound, makeOkRequest()).getStatus() + ); + } + @Test public void testIsEnabled() { diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java index ca0bf0e1d6f..6c3b56af998 100644 --- a/processing/src/main/java/org/apache/druid/error/DruidException.java +++ b/processing/src/main/java/org/apache/druid/error/DruidException.java @@ -340,6 +340,11 @@ public class DruidException extends RuntimeException * Means that an action that was attempted is forbidden */ FORBIDDEN(403), + + /** + * Means that the requsted requested resource cannot be found. + */ + NOT_FOUND(404), /** * Means that some capacity limit was exceeded, this could be due to throttling or due to some system limit */ diff --git a/processing/src/main/java/org/apache/druid/error/NotFound.java b/processing/src/main/java/org/apache/druid/error/NotFound.java new file mode 100644 index 00000000000..03d2a107702 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/error/NotFound.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.error; + +public class NotFound extends DruidException.Failure +{ + + public static DruidException exception(String msg, Object... args) + { + return exception(null, msg, args); + } + + public static DruidException exception(Throwable t, String msg, Object... args) + { + return DruidException.fromFailure(new NotFound(t, msg, args)); + } + + private final Throwable t; + private final String msg; + private final Object[] args; + + public NotFound( + Throwable t, + String msg, + Object... args + ) + { + super("notFound"); + this.t = t; + this.msg = msg; + this.args = args; + } + + + @Override + public DruidException makeException(DruidException.DruidExceptionBuilder bob) + { + bob = bob.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.NOT_FOUND); + + if (t == null) { + return bob.build(msg, args); + } else { + return bob.build(t, msg, args); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/error/NotFoundTest.java b/processing/src/test/java/org/apache/druid/error/NotFoundTest.java new file mode 100644 index 00000000000..6a46e69bcac --- /dev/null +++ b/processing/src/test/java/org/apache/druid/error/NotFoundTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.error; + +import org.apache.druid.matchers.DruidMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +public class NotFoundTest +{ + @Test + public void testAsErrorResponse() + { + ErrorResponse errorResponse = new ErrorResponse(NotFound.exception( + new IOException("could not open file"), + "id not found" + )); + final Map asMap = errorResponse.getAsMap(); + MatcherAssert.assertThat( + asMap, + DruidMatchers.mapMatcher( + "error", "druidException", + "errorCode", "notFound", + "persona", "USER", + "category", "NOT_FOUND", + "errorMessage", "id not found" + ) + ); + + ErrorResponse recomposed = ErrorResponse.fromMap(asMap); + + MatcherAssert.assertThat( + recomposed.getUnderlyingException(), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.NOT_FOUND, + "notFound" + ).expectMessageContains("id not found") + ); + } +}