diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java index e1d31424e43..8f6f233ba9a 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java @@ -36,7 +36,6 @@ public class BytesFullResponseHolder extends FullResponseHolder this.chunks = new ArrayList<>(); } - @Override public BytesFullResponseHolder addChunk(byte[] chunk) { chunks.add(chunk); diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java index 001684d7819..fbbab874ffc 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java @@ -49,12 +49,7 @@ public abstract class FullResponseHolder } /** - * Append a new chunk of data. - */ - public abstract FullResponseHolder addChunk(T chunk); - - /** - * Get the accumulated data via {@link #addChunk}. + * Get the data. */ public abstract T getContent(); } diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java new file mode 100644 index 00000000000..01a69a80f6c --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.response; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.codec.http.HttpChunk; +import org.jboss.netty.handler.codec.http.HttpResponse; + +/** + * This is a clone of {@link InputStreamResponseHandler} except that it retains HTTP status/response object in the + * response holder result. + */ +public class InputStreamFullResponseHandler implements HttpResponseHandler +{ + @Override + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) + { + InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response.getStatus(), response); + holder.addChunk(getContentBytes(response.getContent())); + return ClientResponse.finished(holder); + } + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, + HttpChunk chunk, + long chunkNum + ) + { + clientResponse.getObj().addChunk(getContentBytes(chunk.getContent())); + return clientResponse; + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + InputStreamFullResponseHolder holder = clientResponse.getObj(); + holder.done(); + return ClientResponse.finished(holder); + } + + @Override + public void exceptionCaught( + ClientResponse clientResponse, + Throwable e + ) + { + clientResponse.getObj().exceptionCaught(e); + } + + private byte[] getContentBytes(ChannelBuffer content) + { + byte[] contentBytes = new byte[content.readableBytes()]; + content.readBytes(contentBytes); + return contentBytes; + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java new file mode 100644 index 00000000000..fbabe63a754 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.response; + +import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.io.InputStream; + +public class InputStreamFullResponseHolder extends FullResponseHolder +{ + private final AppendableByteArrayInputStream is; + + public InputStreamFullResponseHolder( + HttpResponseStatus status, + HttpResponse response + ) + { + super(status, response); + is = new AppendableByteArrayInputStream(); + } + + public InputStreamFullResponseHolder addChunk(byte[] chunk) + { + is.add(chunk); + return this; + } + + @Override + public InputStream getContent() + { + return is; + } + + public void done() + { + is.done(); + } + + public void exceptionCaught(Throwable t) + { + is.exceptionCaught(t); + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java index ac7b8aafa8f..3fe2e081b5e 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java @@ -38,7 +38,6 @@ public class StringFullResponseHolder extends FullResponseHolder this.builder = new StringBuilder(response.getContent().toString(charset)); } - @Override public StringFullResponseHolder addChunk(String chunk) { builder.append(chunk); diff --git a/core/src/test/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandlerTest.java b/core/src/test/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandlerTest.java new file mode 100644 index 00000000000..9c400471103 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandlerTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.response; + +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; +import org.jboss.netty.handler.codec.http.DefaultHttpChunk; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; + +public class InputStreamFullResponseHandlerTest +{ + @Test + public void testSimple() throws Exception + { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.setChunked(false); + response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING))); + + InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler(); + ClientResponse clientResp = responseHandler.handleResponse(response, null); + + DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING))); + clientResp = responseHandler.handleChunk(clientResp, chunk, 0); + + clientResp = responseHandler.done(clientResp); + + Assert.assertTrue(clientResp.isFinished()); + Assert.assertEquals("abcdefg", IOUtils.toString(clientResp.getObj().getContent(), StandardCharsets.UTF_8)); + } + + @Test + public void testException() throws Exception + { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.setChunked(false); + response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING))); + + InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler(); + ClientResponse clientResp = responseHandler.handleResponse(response, null); + + Exception ex = new RuntimeException("dummy!"); + responseHandler.exceptionCaught(clientResp, ex); + + Assert.assertTrue(clientResp.isFinished()); + } +} diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 651ae635049..fa6c45e9af3 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -509,8 +509,7 @@ public class DirectDruidClient implements QueryRunner url, query, host, - toolChest.decorateObjectMapper(objectMapper, query), - null + toolChest.decorateObjectMapper(objectMapper, query) ); } diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java index 9478f246764..c12e64a8120 100644 --- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java +++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java @@ -25,17 +25,14 @@ import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.ResourceLimitExceededException; -import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import javax.annotation.Nullable; -import javax.servlet.http.HttpServletResponse; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -57,7 +54,6 @@ public class JsonParserIterator implements Iterator, Closeable private final String url; private final String host; private final ObjectMapper objectMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final boolean hasTimeout; private final long timeoutAt; private final String queryId; @@ -68,8 +64,7 @@ public class JsonParserIterator implements Iterator, Closeable String url, @Nullable Query query, String host, - ObjectMapper objectMapper, - BytesAccumulatingResponseHandler responseHandler + ObjectMapper objectMapper ) { this.typeRef = typeRef; @@ -85,7 +80,6 @@ public class JsonParserIterator implements Iterator, Closeable this.jp = null; this.host = host; this.objectMapper = objectMapper; - this.responseHandler = responseHandler; this.hasTimeout = timeoutAt > -1; } @@ -137,16 +131,6 @@ public class JsonParserIterator implements Iterator, Closeable InputStream is = hasTimeout ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS) : future.get(); - if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) { - interruptQuery( - new RE( - "Unexpected response status [%s] description [%s] from request url[%s]", - responseHandler.getStatus(), - responseHandler.getDescription(), - url - ) - ); - } if (is != null) { jp = objectMapper.getFactory().createParser(is); } else { diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java index 95b4fa6a32c..2b107bbac3a 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java @@ -21,7 +21,6 @@ package org.apache.druid.discovery; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.selector.DiscoverySelector; import org.apache.druid.client.selector.Server; import org.apache.druid.concurrent.LifecycleLock; @@ -121,23 +120,13 @@ public class DruidLeaderClient log.debug("Stopped."); } - /** - * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located. - * - * @param cached Uses cached leader if true, else uses the current leader - */ - public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException - { - Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); - return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath))); - } - /** * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located. */ public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException { - return makeRequest(httpMethod, urlPath, true); + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath))); } public StringFullResponseHolder go(Request request) throws IOException, InterruptedException @@ -145,18 +134,6 @@ public class DruidLeaderClient return go(request, new StringFullResponseHandler(StandardCharsets.UTF_8)); } - /** - * Executes the request object aimed at the leader and process the response with given handler - * Note: this method doesn't do retrying on errors or handle leader changes occurred during communication - */ - public ListenableFuture goAsync( - final Request request, - final HttpResponseHandler handler - ) - { - return httpClient.go(request, handler); - } - /** * Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located. */ diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 05cda79fd13..1cc5dc746e8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -20,11 +20,9 @@ package org.apache.druid.sql.calcite.schema; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.Inject; import org.apache.druid.client.BrokerSegmentWatcherConfig; @@ -35,22 +33,16 @@ import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.http.client.Request; import org.apache.druid.metadata.SegmentsMetadataManager; -import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; -import org.jboss.netty.handler.codec.http.HttpMethod; -import java.io.IOException; -import java.io.InputStream; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -74,7 +66,6 @@ public class MetadataSegmentView private final DruidLeaderClient coordinatorDruidLeaderClient; private final ObjectMapper jsonMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final boolean isCacheEnabled; @@ -96,7 +87,6 @@ public class MetadataSegmentView public MetadataSegmentView( final @Coordinator DruidLeaderClient druidLeaderClient, final ObjectMapper jsonMapper, - final BytesAccumulatingResponseHandler responseHandler, final BrokerSegmentWatcherConfig segmentWatcherConfig, final PlannerConfig plannerConfig ) @@ -104,7 +94,6 @@ public class MetadataSegmentView Preconditions.checkNotNull(plannerConfig, "plannerConfig"); this.coordinatorDruidLeaderClient = druidLeaderClient; this.jsonMapper = jsonMapper; - this.responseHandler = responseHandler; this.segmentWatcherConfig = segmentWatcherConfig; this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod(); @@ -148,7 +137,6 @@ public class MetadataSegmentView final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, - responseHandler, segmentWatcherConfig.getWatchedDataSources() ); @@ -175,7 +163,6 @@ public class MetadataSegmentView return getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, - responseHandler, segmentWatcherConfig.getWatchedDataSources() ); } @@ -185,7 +172,6 @@ public class MetadataSegmentView private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler, Set watchedDataSources ) { @@ -200,33 +186,14 @@ public class MetadataSegmentView sb.setLength(sb.length() - 1); query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb; } - Request request; - try { - request = coordinatorClient.makeRequest( - HttpMethod.GET, - StringUtils.format(query), - false - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } - ListenableFuture future = coordinatorClient.goAsync( - request, - responseHandler - ); - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() - { - }); - return new JsonParserIterator<>( - typeRef, - future, - request.getUrl().toString(), - null, - request.getUrl().getHost(), - jsonMapper, - responseHandler + return SystemSchema.getThingsFromLeaderNode( + query, + new TypeReference() + { + }, + coordinatorClient, + jsonMapper ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index afe47fda807..0237b86512c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -29,7 +29,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Futures; import com.google.inject.Inject; import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.DefaultEnumerable; @@ -58,13 +58,15 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.server.DruidNode; -import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthenticationResult; @@ -81,8 +83,8 @@ import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; +import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -209,13 +211,12 @@ public class SystemSchema extends AbstractSchema ) { Preconditions.checkNotNull(serverView, "serverView"); - BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); this.tableMap = ImmutableMap.of( SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper), SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), - TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper), - SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) + TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper), + SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper) ); } @@ -674,19 +675,16 @@ public class SystemSchema extends AbstractSchema { private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final AuthorizerMapper authorizerMapper; public TasksTable( DruidLeaderClient druidLeaderClient, ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler, AuthorizerMapper authorizerMapper ) { this.druidLeaderClient = druidLeaderClient; this.jsonMapper = jsonMapper; - this.responseHandler = responseHandler; this.authorizerMapper = authorizerMapper; } @@ -788,7 +786,7 @@ public class SystemSchema extends AbstractSchema } } - return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper, responseHandler)); + return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper)); } private CloseableIterator getAuthorizedTasks( @@ -819,37 +817,16 @@ public class SystemSchema extends AbstractSchema //Note that overlord must be up to get tasks private static JsonParserIterator getTasks( DruidLeaderClient indexingServiceClient, - ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler + ObjectMapper jsonMapper ) { - Request request; - try { - request = indexingServiceClient.makeRequest( - HttpMethod.GET, - "/druid/indexer/v1/tasks", - false - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } - ListenableFuture future = indexingServiceClient.goAsync( - request, - responseHandler - ); - - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() - { - }); - return new JsonParserIterator<>( - typeRef, - future, - request.getUrl().toString(), - null, - request.getUrl().getHost(), - jsonMapper, - responseHandler + return getThingsFromLeaderNode( + "/druid/indexer/v1/tasks", + new TypeReference() + { + }, + indexingServiceClient, + jsonMapper ); } @@ -860,19 +837,16 @@ public class SystemSchema extends AbstractSchema { private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final AuthorizerMapper authorizerMapper; public SupervisorsTable( DruidLeaderClient druidLeaderClient, ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler, AuthorizerMapper authorizerMapper ) { this.druidLeaderClient = druidLeaderClient; this.jsonMapper = jsonMapper; - this.responseHandler = responseHandler; this.authorizerMapper = authorizerMapper; } @@ -954,7 +928,7 @@ public class SystemSchema extends AbstractSchema } } - return new SupervisorsEnumerable(getSupervisors(druidLeaderClient, jsonMapper, responseHandler)); + return new SupervisorsEnumerable(getSupervisors(druidLeaderClient, jsonMapper)); } private CloseableIterator getAuthorizedSupervisors( @@ -985,37 +959,60 @@ public class SystemSchema extends AbstractSchema // will fail with internal server error (HTTP 500) private static JsonParserIterator getSupervisors( DruidLeaderClient indexingServiceClient, - ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler + ObjectMapper jsonMapper + ) + { + return getThingsFromLeaderNode( + "/druid/indexer/v1/supervisor?system", + new TypeReference() + { + }, + indexingServiceClient, + jsonMapper + ); + } + + public static JsonParserIterator getThingsFromLeaderNode( + String query, + TypeReference typeRef, + DruidLeaderClient leaderClient, + ObjectMapper jsonMapper ) { Request request; + InputStreamFullResponseHolder responseHolder; try { - request = indexingServiceClient.makeRequest( + request = leaderClient.makeRequest( HttpMethod.GET, - "/druid/indexer/v1/supervisor?system", - false + query ); + + responseHolder = leaderClient.go( + request, + new InputStreamFullResponseHandler() + ); + + if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) { + throw new RE( + "Failed to talk to leader node at [%s]. Error code[%d], description[%s].", + query, + responseHolder.getStatus().getCode(), + responseHolder.getStatus().getReasonPhrase() + ); + } } - catch (IOException e) { + catch (IOException | InterruptedException e) { throw new RuntimeException(e); } - ListenableFuture future = indexingServiceClient.goAsync( - request, - responseHandler - ); - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() - { - }); + final JavaType javaType = jsonMapper.getTypeFactory().constructType(typeRef); return new JsonParserIterator<>( - typeRef, - future, + javaType, + Futures.immediateFuture(responseHolder.getContent()), request.getUrl().toString(), null, request.getUrl().getHost(), - jsonMapper, - responseHandler + jsonMapper ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 08fe2ea5b70..b365c7e4b7f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.SettableFuture; import junitparams.converters.Nullable; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; @@ -54,8 +53,9 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -95,8 +95,11 @@ import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.easymock.EasyMock; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -105,10 +108,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.servlet.http.HttpServletResponse; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -1036,17 +1037,18 @@ public class SystemSchemaTest extends CalciteTestBase { SystemSchema.TasksTable tasksTable = EasyMock.createMockBuilder(SystemSchema.TasksTable.class) - .withConstructor(client, mapper, responseHandler, authMapper) + .withConstructor(client, mapper, authMapper) .createMock(); - EasyMock.replay(tasksTable); - EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks", false)).andReturn(request).anyTimes(); - SettableFuture future = SettableFuture.create(); - EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once(); - final int ok = HttpServletResponse.SC_OK; - EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes(); - EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes(); - AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); + EasyMock.replay(tasksTable); + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes(); + + + HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp); + + EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once(); + EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes(); String json = "[{\n" + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" @@ -1082,9 +1084,8 @@ public class SystemSchemaTest extends CalciteTestBase + "\t\"errorMsg\": null\n" + "}]"; byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); - in.add(bytesToWrite); - in.done(); - future.set(in); + responseHolder.addChunk(bytesToWrite); + responseHolder.done(); EasyMock.replay(client, request, responseHandler); DataContext dataContext = new DataContext() @@ -1159,24 +1160,24 @@ public class SystemSchemaTest extends CalciteTestBase .withConstructor( client, mapper, - responseHandler, authMapper ) .createMock(); EasyMock.replay(supervisorTable); - EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system", false)) + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system")) .andReturn(request) .anyTimes(); - SettableFuture future = SettableFuture.create(); - EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once(); - final int ok = HttpServletResponse.SC_OK; - EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes(); + + HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp); + + EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once(); + + EasyMock.expect(responseHandler.getStatus()).andReturn(httpResp.getStatus().getCode()).anyTimes(); EasyMock.expect(request.getUrl()) .andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?system")) .anyTimes(); - AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); - String json = "[{\n" + "\t\"id\": \"wikipedia\",\n" + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n" @@ -1190,9 +1191,8 @@ public class SystemSchemaTest extends CalciteTestBase + "}]"; byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); - in.add(bytesToWrite); - in.done(); - future.set(in); + responseHolder.addChunk(bytesToWrite); + responseHolder.done(); EasyMock.replay(client, request, responseHandler); DataContext dataContext = new DataContext() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 0c186ed5558..67d4c0b18ab 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -93,7 +93,6 @@ import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AllowAllAuthenticator; @@ -999,7 +998,6 @@ public class CalciteTests new MetadataSegmentView( druidLeaderClient, getJsonMapper(), - new BytesAccumulatingResponseHandler(), new BrokerSegmentWatcherConfig(), plannerConfig ),