remove DruidLeaderClient.goAsync(..) that does not follow redirect. Replace its usage by DruidLeaderClient.go(..) with InputStreamFullResponseHandler (#9717)

* remove DruidLeaderClient.goAsync(..) that does not follow redirect.
Replace its usage by DruidLeaadereClient.go(..) with
InputStreamFullResponseHandler

* remove ByteArrayResponseHolder dependency from JsonParserIterator

* add UT to cover lines in InputStreamFullResponseHandler

* refactor SystemSchema to reduce branches

* further reduce branches

* Revert "add UT to cover lines in InputStreamFullResponseHandler"

This reverts commit 330aba3dd9.

* UTs for InputStreamFullResponseHandler

* remove unused imports
This commit is contained in:
Himanshu 2020-08-14 10:51:18 -07:00 committed by GitHub
parent 6cca7242de
commit 12ae84165e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 303 additions and 181 deletions

View File

@ -36,7 +36,6 @@ public class BytesFullResponseHolder extends FullResponseHolder<byte[]>
this.chunks = new ArrayList<>(); this.chunks = new ArrayList<>();
} }
@Override
public BytesFullResponseHolder addChunk(byte[] chunk) public BytesFullResponseHolder addChunk(byte[] chunk)
{ {
chunks.add(chunk); chunks.add(chunk);

View File

@ -49,12 +49,7 @@ public abstract class FullResponseHolder<T>
} }
/** /**
* Append a new chunk of data. * Get the data.
*/
public abstract FullResponseHolder addChunk(T chunk);
/**
* Get the accumulated data via {@link #addChunk}.
*/ */
public abstract T getContent(); public abstract T getContent();
} }

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.http.client.response;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;
/**
* This is a clone of {@link InputStreamResponseHandler} except that it retains HTTP status/response object in the
* response holder result.
*/
public class InputStreamFullResponseHandler implements HttpResponseHandler<InputStreamFullResponseHolder, InputStreamFullResponseHolder>
{
@Override
public ClientResponse<InputStreamFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response.getStatus(), response);
holder.addChunk(getContentBytes(response.getContent()));
return ClientResponse.finished(holder);
}
@Override
public ClientResponse<InputStreamFullResponseHolder> handleChunk(
ClientResponse<InputStreamFullResponseHolder> clientResponse,
HttpChunk chunk,
long chunkNum
)
{
clientResponse.getObj().addChunk(getContentBytes(chunk.getContent()));
return clientResponse;
}
@Override
public ClientResponse<InputStreamFullResponseHolder> done(ClientResponse<InputStreamFullResponseHolder> clientResponse)
{
InputStreamFullResponseHolder holder = clientResponse.getObj();
holder.done();
return ClientResponse.finished(holder);
}
@Override
public void exceptionCaught(
ClientResponse<InputStreamFullResponseHolder> clientResponse,
Throwable e
)
{
clientResponse.getObj().exceptionCaught(e);
}
private byte[] getContentBytes(ChannelBuffer content)
{
byte[] contentBytes = new byte[content.readableBytes()];
content.readBytes(contentBytes);
return contentBytes;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.http.client.response;
import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.io.InputStream;
public class InputStreamFullResponseHolder extends FullResponseHolder<InputStream>
{
private final AppendableByteArrayInputStream is;
public InputStreamFullResponseHolder(
HttpResponseStatus status,
HttpResponse response
)
{
super(status, response);
is = new AppendableByteArrayInputStream();
}
public InputStreamFullResponseHolder addChunk(byte[] chunk)
{
is.add(chunk);
return this;
}
@Override
public InputStream getContent()
{
return is;
}
public void done()
{
is.done();
}
public void exceptionCaught(Throwable t)
{
is.exceptionCaught(t);
}
}

View File

@ -38,7 +38,6 @@ public class StringFullResponseHolder extends FullResponseHolder<String>
this.builder = new StringBuilder(response.getContent().toString(charset)); this.builder = new StringBuilder(response.getContent().toString(charset));
} }
@Override
public StringFullResponseHolder addChunk(String chunk) public StringFullResponseHolder addChunk(String chunk)
{ {
builder.append(chunk); builder.append(chunk);

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.http.client.response;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.Assert;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
public class InputStreamFullResponseHandlerTest
{
@Test
public void testSimple() throws Exception
{
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setChunked(false);
response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));
InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler();
ClientResponse<InputStreamFullResponseHolder> clientResp = responseHandler.handleResponse(response, null);
DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING)));
clientResp = responseHandler.handleChunk(clientResp, chunk, 0);
clientResp = responseHandler.done(clientResp);
Assert.assertTrue(clientResp.isFinished());
Assert.assertEquals("abcdefg", IOUtils.toString(clientResp.getObj().getContent(), StandardCharsets.UTF_8));
}
@Test
public void testException() throws Exception
{
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setChunked(false);
response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));
InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler();
ClientResponse<InputStreamFullResponseHolder> clientResp = responseHandler.handleResponse(response, null);
Exception ex = new RuntimeException("dummy!");
responseHandler.exceptionCaught(clientResp, ex);
Assert.assertTrue(clientResp.isFinished());
}
}

View File

@ -509,8 +509,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
url, url,
query, query,
host, host,
toolChest.decorateObjectMapper(objectMapper, query), toolChest.decorateObjectMapper(objectMapper, query)
null
); );
} }

View File

@ -25,17 +25,14 @@ import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -57,7 +54,6 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
private final String url; private final String url;
private final String host; private final String host;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final boolean hasTimeout; private final boolean hasTimeout;
private final long timeoutAt; private final long timeoutAt;
private final String queryId; private final String queryId;
@ -68,8 +64,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
String url, String url,
@Nullable Query<T> query, @Nullable Query<T> query,
String host, String host,
ObjectMapper objectMapper, ObjectMapper objectMapper
BytesAccumulatingResponseHandler responseHandler
) )
{ {
this.typeRef = typeRef; this.typeRef = typeRef;
@ -85,7 +80,6 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
this.jp = null; this.jp = null;
this.host = host; this.host = host;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.responseHandler = responseHandler;
this.hasTimeout = timeoutAt > -1; this.hasTimeout = timeoutAt > -1;
} }
@ -137,16 +131,6 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
InputStream is = hasTimeout InputStream is = hasTimeout
? future.get(timeLeftMillis, TimeUnit.MILLISECONDS) ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS)
: future.get(); : future.get();
if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) {
interruptQuery(
new RE(
"Unexpected response status [%s] description [%s] from request url[%s]",
responseHandler.getStatus(),
responseHandler.getDescription(),
url
)
);
}
if (is != null) { if (is != null) {
jp = objectMapper.getFactory().createParser(is); jp = objectMapper.getFactory().createParser(is);
} else { } else {

View File

@ -21,7 +21,6 @@ package org.apache.druid.discovery;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.selector.DiscoverySelector; import org.apache.druid.client.selector.DiscoverySelector;
import org.apache.druid.client.selector.Server; import org.apache.druid.client.selector.Server;
import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.concurrent.LifecycleLock;
@ -121,23 +120,13 @@ public class DruidLeaderClient
log.debug("Stopped."); log.debug("Stopped.");
} }
/**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*
* @param cached Uses cached leader if true, else uses the current leader
*/
public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath)));
}
/** /**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located. * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/ */
public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
{ {
return makeRequest(httpMethod, urlPath, true); Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
} }
public StringFullResponseHolder go(Request request) throws IOException, InterruptedException public StringFullResponseHolder go(Request request) throws IOException, InterruptedException
@ -145,18 +134,6 @@ public class DruidLeaderClient
return go(request, new StringFullResponseHandler(StandardCharsets.UTF_8)); return go(request, new StringFullResponseHandler(StandardCharsets.UTF_8));
} }
/**
* Executes the request object aimed at the leader and process the response with given handler
* Note: this method doesn't do retrying on errors or handle leader changes occurred during communication
*/
public <Intermediate, Final> ListenableFuture<Final> goAsync(
final Request request,
final HttpResponseHandler<Intermediate, Final> handler
)
{
return httpClient.go(request, handler);
}
/** /**
* Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located. * Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/ */

View File

@ -20,11 +20,9 @@
package org.apache.druid.sql.calcite.schema; package org.apache.druid.sql.calcite.schema;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.ImmutableSortedSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.BrokerSegmentWatcherConfig;
@ -35,22 +33,16 @@ import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.jboss.netty.handler.codec.http.HttpMethod;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -74,7 +66,6 @@ public class MetadataSegmentView
private final DruidLeaderClient coordinatorDruidLeaderClient; private final DruidLeaderClient coordinatorDruidLeaderClient;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final BrokerSegmentWatcherConfig segmentWatcherConfig;
private final boolean isCacheEnabled; private final boolean isCacheEnabled;
@ -96,7 +87,6 @@ public class MetadataSegmentView
public MetadataSegmentView( public MetadataSegmentView(
final @Coordinator DruidLeaderClient druidLeaderClient, final @Coordinator DruidLeaderClient druidLeaderClient,
final ObjectMapper jsonMapper, final ObjectMapper jsonMapper,
final BytesAccumulatingResponseHandler responseHandler,
final BrokerSegmentWatcherConfig segmentWatcherConfig, final BrokerSegmentWatcherConfig segmentWatcherConfig,
final PlannerConfig plannerConfig final PlannerConfig plannerConfig
) )
@ -104,7 +94,6 @@ public class MetadataSegmentView
Preconditions.checkNotNull(plannerConfig, "plannerConfig"); Preconditions.checkNotNull(plannerConfig, "plannerConfig");
this.coordinatorDruidLeaderClient = druidLeaderClient; this.coordinatorDruidLeaderClient = druidLeaderClient;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.responseHandler = responseHandler;
this.segmentWatcherConfig = segmentWatcherConfig; this.segmentWatcherConfig = segmentWatcherConfig;
this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable();
this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod(); this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod();
@ -148,7 +137,6 @@ public class MetadataSegmentView
final JsonParserIterator<SegmentWithOvershadowedStatus> metadataSegments = getMetadataSegments( final JsonParserIterator<SegmentWithOvershadowedStatus> metadataSegments = getMetadataSegments(
coordinatorDruidLeaderClient, coordinatorDruidLeaderClient,
jsonMapper, jsonMapper,
responseHandler,
segmentWatcherConfig.getWatchedDataSources() segmentWatcherConfig.getWatchedDataSources()
); );
@ -175,7 +163,6 @@ public class MetadataSegmentView
return getMetadataSegments( return getMetadataSegments(
coordinatorDruidLeaderClient, coordinatorDruidLeaderClient,
jsonMapper, jsonMapper,
responseHandler,
segmentWatcherConfig.getWatchedDataSources() segmentWatcherConfig.getWatchedDataSources()
); );
} }
@ -185,7 +172,6 @@ public class MetadataSegmentView
private JsonParserIterator<SegmentWithOvershadowedStatus> getMetadataSegments( private JsonParserIterator<SegmentWithOvershadowedStatus> getMetadataSegments(
DruidLeaderClient coordinatorClient, DruidLeaderClient coordinatorClient,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler,
Set<String> watchedDataSources Set<String> watchedDataSources
) )
{ {
@ -200,33 +186,14 @@ public class MetadataSegmentView
sb.setLength(sb.length() - 1); sb.setLength(sb.length() - 1);
query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb; query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb;
} }
Request request;
try {
request = coordinatorClient.makeRequest(
HttpMethod.GET,
StringUtils.format(query),
false
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
ListenableFuture<InputStream> future = coordinatorClient.goAsync(
request,
responseHandler
);
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<SegmentWithOvershadowedStatus>() return SystemSchema.getThingsFromLeaderNode(
{ query,
}); new TypeReference<SegmentWithOvershadowedStatus>()
return new JsonParserIterator<>( {
typeRef, },
future, coordinatorClient,
request.getUrl().toString(), jsonMapper
null,
request.getUrl().getHost(),
jsonMapper,
responseHandler
); );
} }

View File

@ -29,7 +29,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort; import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.calcite.DataContext; import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.DefaultEnumerable; import org.apache.calcite.linq4j.DefaultEnumerable;
@ -58,13 +58,15 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.AuthenticationResult;
@ -81,8 +83,8 @@ import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -209,13 +211,12 @@ public class SystemSchema extends AbstractSchema
) )
{ {
Preconditions.checkNotNull(serverView, "serverView"); Preconditions.checkNotNull(serverView, "serverView");
BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
this.tableMap = ImmutableMap.of( this.tableMap = ImmutableMap.of(
SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper), SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper),
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper),
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper), TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper),
SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper)
); );
} }
@ -674,19 +675,16 @@ public class SystemSchema extends AbstractSchema
{ {
private final DruidLeaderClient druidLeaderClient; private final DruidLeaderClient druidLeaderClient;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final AuthorizerMapper authorizerMapper; private final AuthorizerMapper authorizerMapper;
public TasksTable( public TasksTable(
DruidLeaderClient druidLeaderClient, DruidLeaderClient druidLeaderClient,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler,
AuthorizerMapper authorizerMapper AuthorizerMapper authorizerMapper
) )
{ {
this.druidLeaderClient = druidLeaderClient; this.druidLeaderClient = druidLeaderClient;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.responseHandler = responseHandler;
this.authorizerMapper = authorizerMapper; this.authorizerMapper = authorizerMapper;
} }
@ -788,7 +786,7 @@ public class SystemSchema extends AbstractSchema
} }
} }
return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper, responseHandler)); return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper));
} }
private CloseableIterator<TaskStatusPlus> getAuthorizedTasks( private CloseableIterator<TaskStatusPlus> getAuthorizedTasks(
@ -819,37 +817,16 @@ public class SystemSchema extends AbstractSchema
//Note that overlord must be up to get tasks //Note that overlord must be up to get tasks
private static JsonParserIterator<TaskStatusPlus> getTasks( private static JsonParserIterator<TaskStatusPlus> getTasks(
DruidLeaderClient indexingServiceClient, DruidLeaderClient indexingServiceClient,
ObjectMapper jsonMapper, ObjectMapper jsonMapper
BytesAccumulatingResponseHandler responseHandler
) )
{ {
Request request; return getThingsFromLeaderNode(
try { "/druid/indexer/v1/tasks",
request = indexingServiceClient.makeRequest( new TypeReference<TaskStatusPlus>()
HttpMethod.GET, {
"/druid/indexer/v1/tasks", },
false indexingServiceClient,
); jsonMapper
}
catch (IOException e) {
throw new RuntimeException(e);
}
ListenableFuture<InputStream> future = indexingServiceClient.goAsync(
request,
responseHandler
);
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<TaskStatusPlus>()
{
});
return new JsonParserIterator<>(
typeRef,
future,
request.getUrl().toString(),
null,
request.getUrl().getHost(),
jsonMapper,
responseHandler
); );
} }
@ -860,19 +837,16 @@ public class SystemSchema extends AbstractSchema
{ {
private final DruidLeaderClient druidLeaderClient; private final DruidLeaderClient druidLeaderClient;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final AuthorizerMapper authorizerMapper; private final AuthorizerMapper authorizerMapper;
public SupervisorsTable( public SupervisorsTable(
DruidLeaderClient druidLeaderClient, DruidLeaderClient druidLeaderClient,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler,
AuthorizerMapper authorizerMapper AuthorizerMapper authorizerMapper
) )
{ {
this.druidLeaderClient = druidLeaderClient; this.druidLeaderClient = druidLeaderClient;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.responseHandler = responseHandler;
this.authorizerMapper = authorizerMapper; this.authorizerMapper = authorizerMapper;
} }
@ -954,7 +928,7 @@ public class SystemSchema extends AbstractSchema
} }
} }
return new SupervisorsEnumerable(getSupervisors(druidLeaderClient, jsonMapper, responseHandler)); return new SupervisorsEnumerable(getSupervisors(druidLeaderClient, jsonMapper));
} }
private CloseableIterator<SupervisorStatus> getAuthorizedSupervisors( private CloseableIterator<SupervisorStatus> getAuthorizedSupervisors(
@ -985,37 +959,60 @@ public class SystemSchema extends AbstractSchema
// will fail with internal server error (HTTP 500) // will fail with internal server error (HTTP 500)
private static JsonParserIterator<SupervisorStatus> getSupervisors( private static JsonParserIterator<SupervisorStatus> getSupervisors(
DruidLeaderClient indexingServiceClient, DruidLeaderClient indexingServiceClient,
ObjectMapper jsonMapper, ObjectMapper jsonMapper
BytesAccumulatingResponseHandler responseHandler )
{
return getThingsFromLeaderNode(
"/druid/indexer/v1/supervisor?system",
new TypeReference<SupervisorStatus>()
{
},
indexingServiceClient,
jsonMapper
);
}
public static <T> JsonParserIterator<T> getThingsFromLeaderNode(
String query,
TypeReference<T> typeRef,
DruidLeaderClient leaderClient,
ObjectMapper jsonMapper
) )
{ {
Request request; Request request;
InputStreamFullResponseHolder responseHolder;
try { try {
request = indexingServiceClient.makeRequest( request = leaderClient.makeRequest(
HttpMethod.GET, HttpMethod.GET,
"/druid/indexer/v1/supervisor?system", query
false
); );
responseHolder = leaderClient.go(
request,
new InputStreamFullResponseHandler()
);
if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) {
throw new RE(
"Failed to talk to leader node at [%s]. Error code[%d], description[%s].",
query,
responseHolder.getStatus().getCode(),
responseHolder.getStatus().getReasonPhrase()
);
}
} }
catch (IOException e) { catch (IOException | InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
ListenableFuture<InputStream> future = indexingServiceClient.goAsync(
request,
responseHandler
);
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<SupervisorStatus>() final JavaType javaType = jsonMapper.getTypeFactory().constructType(typeRef);
{
});
return new JsonParserIterator<>( return new JsonParserIterator<>(
typeRef, javaType,
future, Futures.immediateFuture(responseHolder.getContent()),
request.getUrl().toString(), request.getUrl().toString(),
null, null,
request.getUrl().getHost(), request.getUrl().getHost(),
jsonMapper, jsonMapper
responseHandler
); );
} }

View File

@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.SettableFuture;
import junitparams.converters.Nullable; import junitparams.converters.Nullable;
import org.apache.calcite.DataContext; import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.adapter.java.JavaTypeFactory;
@ -54,8 +53,9 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler; import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -95,8 +95,11 @@ import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -105,10 +108,8 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import javax.servlet.http.HttpServletResponse;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
@ -1036,17 +1037,18 @@ public class SystemSchemaTest extends CalciteTestBase
{ {
SystemSchema.TasksTable tasksTable = EasyMock.createMockBuilder(SystemSchema.TasksTable.class) SystemSchema.TasksTable tasksTable = EasyMock.createMockBuilder(SystemSchema.TasksTable.class)
.withConstructor(client, mapper, responseHandler, authMapper) .withConstructor(client, mapper, authMapper)
.createMock(); .createMock();
EasyMock.replay(tasksTable);
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks", false)).andReturn(request).anyTimes();
SettableFuture<InputStream> future = SettableFuture.create();
EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
final int ok = HttpServletResponse.SC_OK;
EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); EasyMock.replay(tasksTable);
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes();
HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp);
EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once();
EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
String json = "[{\n" String json = "[{\n"
+ "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
@ -1082,9 +1084,8 @@ public class SystemSchemaTest extends CalciteTestBase
+ "\t\"errorMsg\": null\n" + "\t\"errorMsg\": null\n"
+ "}]"; + "}]";
byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
in.add(bytesToWrite); responseHolder.addChunk(bytesToWrite);
in.done(); responseHolder.done();
future.set(in);
EasyMock.replay(client, request, responseHandler); EasyMock.replay(client, request, responseHandler);
DataContext dataContext = new DataContext() DataContext dataContext = new DataContext()
@ -1159,24 +1160,24 @@ public class SystemSchemaTest extends CalciteTestBase
.withConstructor( .withConstructor(
client, client,
mapper, mapper,
responseHandler,
authMapper authMapper
) )
.createMock(); .createMock();
EasyMock.replay(supervisorTable); EasyMock.replay(supervisorTable);
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system", false)) EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system"))
.andReturn(request) .andReturn(request)
.anyTimes(); .anyTimes();
SettableFuture<InputStream> future = SettableFuture.create();
EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once(); HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
final int ok = HttpServletResponse.SC_OK; InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp);
EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once();
EasyMock.expect(responseHandler.getStatus()).andReturn(httpResp.getStatus().getCode()).anyTimes();
EasyMock.expect(request.getUrl()) EasyMock.expect(request.getUrl())
.andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?system")) .andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?system"))
.anyTimes(); .anyTimes();
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
String json = "[{\n" String json = "[{\n"
+ "\t\"id\": \"wikipedia\",\n" + "\t\"id\": \"wikipedia\",\n"
+ "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n" + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n"
@ -1190,9 +1191,8 @@ public class SystemSchemaTest extends CalciteTestBase
+ "}]"; + "}]";
byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
in.add(bytesToWrite); responseHolder.addChunk(bytesToWrite);
in.done(); responseHolder.done();
future.set(in);
EasyMock.replay(client, request, responseHandler); EasyMock.replay(client, request, responseHandler);
DataContext dataContext = new DataContext() DataContext dataContext = new DataContext()

View File

@ -93,7 +93,6 @@ import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.AllowAllAuthenticator;
@ -999,7 +998,6 @@ public class CalciteTests
new MetadataSegmentView( new MetadataSegmentView(
druidLeaderClient, druidLeaderClient,
getJsonMapper(), getJsonMapper(),
new BytesAccumulatingResponseHandler(),
new BrokerSegmentWatcherConfig(), new BrokerSegmentWatcherConfig(),
plannerConfig plannerConfig
), ),