Use current coordinator leader instead of cached one (#6551) (#6552)

* Use current coordinator leader instead of cached one (#6551)

Check the response status and throw exception if not OK

* Modify tests

* PR comment

* Add the correct check for status of BytesAccumulatingResponseHandler

* Move the status check into JsonParserIterator so sql query outputs meaningful message on failure

* Fix tests
This commit is contained in:
Surekha 2018-11-06 13:09:51 -08:00 committed by Jonathan Wei
parent 1224d8b746
commit bcb754d066
5 changed files with 44 additions and 13 deletions

View File

@ -543,7 +543,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override
public JsonParserIterator<T> make()
{
return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper);
return new JsonParserIterator<T>(typeRef, future, url, query, host, objectMapper, null);
}
@Override

View File

@ -31,7 +31,9 @@ import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import javax.servlet.http.HttpServletResponse;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@ -50,6 +52,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
private final String url;
private final String host;
private final ObjectMapper objectMapper;
private final BytesAccumulatingResponseHandler responseHandler;
public JsonParserIterator(
JavaType typeRef,
@ -57,7 +60,8 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
String url,
Query<T> query,
String host,
ObjectMapper objectMapper
ObjectMapper objectMapper,
BytesAccumulatingResponseHandler responseHandler
)
{
this.typeRef = typeRef;
@ -67,6 +71,7 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
jp = null;
this.host = host;
this.objectMapper = objectMapper;
this.responseHandler = responseHandler;
}
@Override
@ -111,6 +116,14 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
if (jp == null) {
try {
InputStream is = future.get();
if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) {
throw new RE(
"Unexpected response status [%s] description [%s] from request url [%s]",
responseHandler.getStatus(),
responseHandler.getDescription(),
url
);
}
if (is == null) {
throw new QueryInterruptedException(
new ResourceLimitExceededException(

View File

@ -120,13 +120,23 @@ public class DruidLeaderClient
log.info("Stopped.");
}
/**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*
* @param cached Uses cached leader if true, else uses the current leader
*/
public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath)));
}
/**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/
public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
return makeRequest(httpMethod, urlPath, true);
}
public FullResponseHolder go(Request request) throws IOException, InterruptedException

View File

@ -50,6 +50,7 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@ -81,13 +82,14 @@ import java.util.stream.Collectors;
public class SystemSchema extends AbstractSchema
{
public static final String NAME = "sys";
private static final String SEGMENTS_TABLE = "segments";
private static final String SERVERS_TABLE = "servers";
private static final String SERVER_SEGMENTS_TABLE = "server_segments";
private static final String TASKS_TABLE = "tasks";
private static final EmittingLogger log = new EmittingLogger(SystemSchema.class);
private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
.builder()
.add("segment_id", ValueType.STRING)
@ -402,7 +404,8 @@ public class SystemSchema extends AbstractSchema
try {
request = coordinatorClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid/coordinator/v1/metadata/segments")
StringUtils.format("/druid/coordinator/v1/metadata/segments"),
false
);
}
catch (IOException e) {
@ -412,6 +415,7 @@ public class SystemSchema extends AbstractSchema
request,
responseHandler
);
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
{
});
@ -421,7 +425,8 @@ public class SystemSchema extends AbstractSchema
request.getUrl().toString(),
null,
request.getUrl().getHost(),
jsonMapper
jsonMapper,
responseHandler
);
}
@ -659,7 +664,8 @@ public class SystemSchema extends AbstractSchema
try {
request = indexingServiceClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid/indexer/v1/tasks")
StringUtils.format("/druid/indexer/v1/tasks"),
false
);
}
catch (IOException e) {
@ -669,6 +675,7 @@ public class SystemSchema extends AbstractSchema
request,
responseHandler
);
final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<TaskStatusPlus>()
{
});
@ -678,7 +685,8 @@ public class SystemSchema extends AbstractSchema
request.getUrl().toString(),
null,
request.getUrl().getHost(),
jsonMapper
jsonMapper,
responseHandler
);
}

View File

@ -339,13 +339,13 @@ public class SystemSchemaTest extends CalciteTestBase
EasyMock.replay(segmentsTable);
EasyMock
.expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments"))
.expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments", false))
.andReturn(request)
.anyTimes();
SettableFuture<InputStream> future = SettableFuture.create();
EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
final int ok = HttpServletResponse.SC_OK;
EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
EasyMock
.expect(request.getUrl())
@ -599,11 +599,11 @@ public class SystemSchemaTest extends CalciteTestBase
.withConstructor(client, mapper, responseHandler, authMapper)
.createMock();
EasyMock.replay(tasksTable);
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes();
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks", false)).andReturn(request).anyTimes();
SettableFuture<InputStream> future = SettableFuture.create();
EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once();
final int ok = HttpServletResponse.SC_OK;
EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();