mirror of https://github.com/apache/druid.git
RemoteTaskActionClient: Better error messages for non-2xx responses.
This commit is contained in:
parent
3b29e77866
commit
ca0a4bd8a5
|
@ -26,7 +26,8 @@ import com.google.common.base.Throwables;
|
|||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.ToStringResponseHandler;
|
||||
import com.metamx.http.client.response.StatusResponseHandler;
|
||||
import com.metamx.http.client.response.StatusResponseHolder;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.indexing.common.RetryPolicy;
|
||||
|
@ -37,6 +38,7 @@ import org.joda.time.Duration;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Map;
|
||||
|
||||
public class RemoteTaskActionClient implements TaskActionClient
|
||||
|
@ -75,22 +77,25 @@ public class RemoteTaskActionClient implements TaskActionClient
|
|||
|
||||
while (true) {
|
||||
try {
|
||||
final Server server;
|
||||
final URI serviceUri;
|
||||
try {
|
||||
serviceUri = getServiceUri();
|
||||
server = getServiceInstance();
|
||||
serviceUri = makeServiceUri(server);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Want to retry, so throw an IOException.
|
||||
throw new IOException("Failed to locate service uri", e);
|
||||
}
|
||||
|
||||
final String response;
|
||||
final StatusResponseHolder response;
|
||||
|
||||
log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction);
|
||||
|
||||
try {
|
||||
response = httpClient.post(serviceUri.toURL())
|
||||
.setContent("application/json", dataToSend)
|
||||
.go(new ToStringResponseHandler(Charsets.UTF_8))
|
||||
.go(new StatusResponseHandler(Charsets.UTF_8))
|
||||
.get();
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
@ -99,13 +104,24 @@ public class RemoteTaskActionClient implements TaskActionClient
|
|||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
final Map<String, Object> responseDict = jsonMapper.readValue(
|
||||
response, new TypeReference<Map<String, Object>>()
|
||||
{
|
||||
if (response.getStatus().getCode() / 200 == 1) {
|
||||
final Map<String, Object> responseDict = jsonMapper.readValue(
|
||||
response.getContent(),
|
||||
new TypeReference<Map<String, Object>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
|
||||
} else {
|
||||
// Want to retry, so throw an IOException.
|
||||
throw new IOException(
|
||||
String.format(
|
||||
"Scary HTTP status returned: %s. Check your overlord[%s] logs for exceptions.",
|
||||
response.getStatus(),
|
||||
server.getHost()
|
||||
)
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
|
||||
}
|
||||
catch (IOException | ChannelException e) {
|
||||
log.warn(e, "Exception submitting action for task[%s]", task.getId());
|
||||
|
@ -127,13 +143,18 @@ public class RemoteTaskActionClient implements TaskActionClient
|
|||
}
|
||||
}
|
||||
|
||||
private URI getServiceUri() throws Exception
|
||||
private URI makeServiceUri(final Server instance) throws URISyntaxException
|
||||
{
|
||||
return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action"));
|
||||
}
|
||||
|
||||
private Server getServiceInstance()
|
||||
{
|
||||
final Server instance = selector.pick();
|
||||
if (instance == null) {
|
||||
throw new ISE("Cannot find instance of indexer to talk to!");
|
||||
} else {
|
||||
return instance;
|
||||
}
|
||||
|
||||
return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue