mirror of https://github.com/apache/druid.git
async servlet delete support + cleanup
This commit is contained in:
parent
8f7fd93491
commit
1fb9b21cf0
|
@ -22,6 +22,7 @@ package io.druid.client;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
@ -30,7 +31,9 @@ import com.metamx.http.client.HttpClient;
|
|||
import com.metamx.http.client.response.HttpResponseHandler;
|
||||
import io.druid.guice.annotations.Client;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.server.QueryResource;
|
||||
import io.druid.server.router.Router;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
@ -68,7 +71,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
|
|||
return openConnections.get();
|
||||
}
|
||||
|
||||
public ListenableFuture<FinalType> post(
|
||||
public ListenableFuture<FinalType> postQuery(
|
||||
String url,
|
||||
Query query,
|
||||
HttpResponseHandler<IntermediateType, FinalType> responseHandler
|
||||
|
@ -81,7 +84,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
|
|||
future = httpClient
|
||||
.post(new URL(url))
|
||||
.setContent(objectMapper.writeValueAsBytes(query))
|
||||
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
|
||||
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON)
|
||||
.go(responseHandler);
|
||||
|
||||
openConnections.getAndIncrement();
|
||||
|
@ -125,4 +128,19 @@ public class RoutingDruidClient<IntermediateType, FinalType>
|
|||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public ListenableFuture<FinalType> delete(
|
||||
String url,
|
||||
HttpResponseHandler<IntermediateType, FinalType> responseHandler
|
||||
)
|
||||
{
|
||||
try {
|
||||
return httpClient
|
||||
.delete(new URL(url))
|
||||
.go(responseHandler);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,11 @@ package io.druid.server;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.FluentIterable;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
|
@ -37,19 +41,20 @@ import io.druid.server.log.RequestLogger;
|
|||
import io.druid.server.router.QueryHostFinder;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.handler.codec.http.HttpChunk;
|
||||
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.annotation.WebServlet;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
|
@ -59,8 +64,6 @@ import java.util.UUID;
|
|||
public class AsyncQueryForwardingServlet extends HttpServlet
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
|
||||
private static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
private static final String DISPATCHED = "dispatched";
|
||||
private static final Joiner COMMA_JOIN = Joiner.on(",");
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
@ -88,275 +91,161 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
|
||||
protected void doGet(HttpServletRequest req, HttpServletResponse res)
|
||||
throws ServletException, IOException
|
||||
{
|
||||
OutputStream out = null;
|
||||
AsyncContext ctx = null;
|
||||
|
||||
try {
|
||||
ctx = req.startAsync(req, resp);
|
||||
final AsyncContext asyncContext = ctx;
|
||||
|
||||
if (req.getAttribute(DISPATCHED) != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
out = resp.getOutputStream();
|
||||
final OutputStream outputStream = out;
|
||||
|
||||
final String host = hostFinder.getDefaultHost();
|
||||
|
||||
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
|
||||
{
|
||||
@Override
|
||||
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
|
||||
final AsyncContext asyncContext = req.startAsync(req, res);
|
||||
asyncContext.start(
|
||||
new Runnable()
|
||||
{
|
||||
resp.setStatus(response.getStatus().getCode());
|
||||
resp.setContentType("application/json");
|
||||
|
||||
try {
|
||||
ChannelBuffer buf = response.getContent();
|
||||
buf.readBytes(outputStream, buf.readableBytes());
|
||||
}
|
||||
catch (Exception e) {
|
||||
asyncContext.complete();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
return ClientResponse.finished(outputStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse<OutputStream> handleChunk(
|
||||
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
|
||||
)
|
||||
{
|
||||
try {
|
||||
ChannelBuffer buf = chunk.getContent();
|
||||
buf.readBytes(outputStream, buf.readableBytes());
|
||||
}
|
||||
catch (Exception e) {
|
||||
asyncContext.complete();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return clientResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
|
||||
{
|
||||
final OutputStream obj = clientResponse.getObj();
|
||||
try {
|
||||
resp.flushBuffer();
|
||||
outputStream.close();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
asyncContext.complete();
|
||||
}
|
||||
|
||||
return ClientResponse.finished(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(
|
||||
ClientResponse<OutputStream> clientResponse,
|
||||
Throwable e
|
||||
)
|
||||
{
|
||||
handleException(resp, asyncContext, e);
|
||||
}
|
||||
};
|
||||
|
||||
asyncContext.start(
|
||||
new Runnable()
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
routingDruidClient.get(makeUrl(host, req), responseHandler);
|
||||
try {
|
||||
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
|
||||
|
||||
final String host = hostFinder.getDefaultHost();
|
||||
routingDruidClient.get(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
|
||||
}
|
||||
catch (Exception e) {
|
||||
handleException(jsonMapper, asyncContext, e);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
asyncContext.dispatch();
|
||||
req.setAttribute(DISPATCHED, true);
|
||||
}
|
||||
catch (Exception e) {
|
||||
handleException(resp, ctx, e);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPost(
|
||||
final HttpServletRequest req, final HttpServletResponse resp
|
||||
) throws ServletException, IOException
|
||||
protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
|
||||
{
|
||||
final long start = System.currentTimeMillis();
|
||||
Query query = null;
|
||||
String queryId;
|
||||
|
||||
final boolean isSmile = "application/smile".equals(req.getContentType());
|
||||
|
||||
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
|
||||
|
||||
OutputStream out = null;
|
||||
AsyncContext ctx = null;
|
||||
|
||||
try {
|
||||
ctx = req.startAsync(req, resp);
|
||||
final AsyncContext asyncContext = ctx;
|
||||
|
||||
if (req.getAttribute(DISPATCHED) != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
query = objectMapper.readValue(req.getInputStream(), Query.class);
|
||||
queryId = query.getId();
|
||||
if (queryId == null) {
|
||||
queryId = UUID.randomUUID().toString();
|
||||
query = query.withId(queryId);
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Got query [%s]", query);
|
||||
}
|
||||
|
||||
out = resp.getOutputStream();
|
||||
final OutputStream outputStream = out;
|
||||
|
||||
final String host = hostFinder.getHost(query);
|
||||
|
||||
final Query theQuery = query;
|
||||
final String theQueryId = queryId;
|
||||
|
||||
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
|
||||
{
|
||||
@Override
|
||||
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
|
||||
final AsyncContext asyncContext = req.startAsync(req, res);
|
||||
asyncContext.start(
|
||||
new Runnable()
|
||||
{
|
||||
resp.setStatus(response.getStatus().getCode());
|
||||
resp.setContentType("application/x-javascript");
|
||||
|
||||
try {
|
||||
ChannelBuffer buf = response.getContent();
|
||||
buf.readBytes(outputStream, buf.readableBytes());
|
||||
}
|
||||
catch (Exception e) {
|
||||
asyncContext.complete();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return ClientResponse.finished(outputStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse<OutputStream> handleChunk(
|
||||
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
|
||||
)
|
||||
{
|
||||
try {
|
||||
ChannelBuffer buf = chunk.getContent();
|
||||
buf.readBytes(outputStream, buf.readableBytes());
|
||||
}
|
||||
catch (Exception e) {
|
||||
asyncContext.complete();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return clientResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
|
||||
{
|
||||
final long requestTime = System.currentTimeMillis() - start;
|
||||
|
||||
log.debug("Request time: %d", requestTime);
|
||||
|
||||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(DataSourceUtil.getMetricName(theQuery.getDataSource()))
|
||||
.setUser4(theQuery.getType())
|
||||
.setUser5(COMMA_JOIN.join(theQuery.getIntervals()))
|
||||
.setUser6(String.valueOf(theQuery.hasFilters()))
|
||||
.setUser7(req.getRemoteAddr())
|
||||
.setUser8(theQueryId)
|
||||
.setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString())
|
||||
.build("request/time", requestTime)
|
||||
);
|
||||
|
||||
final OutputStream obj = clientResponse.getObj();
|
||||
try {
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
theQuery,
|
||||
new QueryStats(ImmutableMap.<String, Object>of("request/time", requestTime, "success", true))
|
||||
)
|
||||
);
|
||||
|
||||
resp.flushBuffer();
|
||||
outputStream.close();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
asyncContext.complete();
|
||||
}
|
||||
|
||||
return ClientResponse.finished(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(
|
||||
ClientResponse<OutputStream> clientResponse,
|
||||
Throwable e
|
||||
)
|
||||
{
|
||||
handleException(resp, asyncContext, e);
|
||||
}
|
||||
};
|
||||
|
||||
asyncContext.start(
|
||||
new Runnable()
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler);
|
||||
try {
|
||||
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
|
||||
|
||||
final String host = hostFinder.getDefaultHost();
|
||||
routingDruidClient.delete(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
|
||||
}
|
||||
catch (Exception e) {
|
||||
handleException(jsonMapper, asyncContext, e);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
asyncContext.dispatch();
|
||||
req.setAttribute(DISPATCHED, true);
|
||||
}
|
||||
catch (Exception e) {
|
||||
handleException(resp, ctx, e);
|
||||
@Override
|
||||
protected void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
|
||||
{
|
||||
final long start = System.currentTimeMillis();
|
||||
final AsyncContext asyncContext = req.startAsync(req, res);
|
||||
asyncContext.start(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
final HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
|
||||
|
||||
try {
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
query,
|
||||
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", e.toString()))
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e2) {
|
||||
log.error(e2, "Unable to log query [%s]!", query);
|
||||
}
|
||||
final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType());
|
||||
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
|
||||
|
||||
log.makeAlert(e, "Exception handling request")
|
||||
.addData("query", query)
|
||||
.addData("peer", req.getRemoteAddr())
|
||||
.emit();
|
||||
}
|
||||
Query inputQuery = null;
|
||||
try {
|
||||
inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
|
||||
if (inputQuery.getId() == null) {
|
||||
inputQuery = inputQuery.withId(UUID.randomUUID().toString());
|
||||
}
|
||||
final Query query = inputQuery;
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Got query [%s]", inputQuery);
|
||||
}
|
||||
|
||||
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(
|
||||
asyncContext,
|
||||
objectMapper
|
||||
)
|
||||
{
|
||||
@Override
|
||||
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
|
||||
{
|
||||
final long requestTime = System.currentTimeMillis() - start;
|
||||
log.debug("Request time: %d", requestTime);
|
||||
|
||||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
|
||||
.setUser4(query.getType())
|
||||
.setUser5(COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser7(request.getRemoteAddr())
|
||||
.setUser8(query.getId())
|
||||
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
|
||||
.build("request/time", requestTime)
|
||||
);
|
||||
|
||||
try {
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
request.getRemoteAddr(),
|
||||
query,
|
||||
new QueryStats(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"request/time",
|
||||
requestTime,
|
||||
"success",
|
||||
true
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
return super.done(clientResponse);
|
||||
}
|
||||
};
|
||||
|
||||
routingDruidClient.postQuery(
|
||||
makeUrl(hostFinder.getHost(inputQuery), request),
|
||||
inputQuery,
|
||||
responseHandler
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
handleException(objectMapper, asyncContext, e);
|
||||
|
||||
try {
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
request.getRemoteAddr(),
|
||||
inputQuery,
|
||||
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", e.toString()))
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception logError) {
|
||||
log.error(logError, "Unable to log query [%s]!", inputQuery);
|
||||
}
|
||||
|
||||
log.makeAlert(e, "Exception handling request")
|
||||
.addData("query", inputQuery)
|
||||
.addData("peer", request.getRemoteAddr())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private String makeUrl(final String host, final HttpServletRequest req)
|
||||
|
@ -370,24 +259,126 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
return String.format("http://%s%s?%s", host, requestURI, queryString);
|
||||
}
|
||||
|
||||
private static void handleException(HttpServletResponse resp, AsyncContext ctx, Throwable e)
|
||||
private static void handleException(ObjectMapper objectMapper, AsyncContext asyncContext, Throwable exception)
|
||||
{
|
||||
try {
|
||||
final ServletOutputStream out = resp.getOutputStream();
|
||||
if (!resp.isCommitted()) {
|
||||
resp.setStatus(500);
|
||||
resp.resetBuffer();
|
||||
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
|
||||
out.write("\n".getBytes(UTF8));
|
||||
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
|
||||
if (!response.isCommitted()) {
|
||||
final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage();
|
||||
|
||||
response.resetBuffer();
|
||||
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||
objectMapper.writeValue(
|
||||
response.getOutputStream(),
|
||||
ImmutableMap.of(
|
||||
"error", errorMessage
|
||||
)
|
||||
);
|
||||
}
|
||||
response.flushBuffer();
|
||||
}
|
||||
catch (IOException e) {
|
||||
Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
asyncContext.complete();
|
||||
}
|
||||
}
|
||||
|
||||
private static class PassthroughHttpResponseHandler implements HttpResponseHandler<OutputStream, OutputStream>
|
||||
{
|
||||
private final AsyncContext asyncContext;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final OutputStream outputStream;
|
||||
|
||||
public PassthroughHttpResponseHandler(AsyncContext asyncContext, ObjectMapper objectMapper) throws IOException
|
||||
{
|
||||
this.asyncContext = asyncContext;
|
||||
this.objectMapper = objectMapper;
|
||||
this.outputStream = asyncContext.getResponse().getOutputStream();
|
||||
}
|
||||
|
||||
protected void copyStatusHeaders(HttpResponse clientResponse)
|
||||
{
|
||||
final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
|
||||
response.setStatus(clientResponse.getStatus().getCode());
|
||||
response.setContentType(clientResponse.headers().get(HttpHeaders.Names.CONTENT_TYPE));
|
||||
|
||||
FluentIterable.from(clientResponse.headers().entries())
|
||||
.filter(new Predicate<Map.Entry<String, String>>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable Map.Entry<String, String> input)
|
||||
{
|
||||
return input.getKey().startsWith("X-Druid");
|
||||
}
|
||||
}
|
||||
)
|
||||
.transform(
|
||||
new Function<Map.Entry<String, String>, Object>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Object apply(@Nullable Map.Entry<String, String> input)
|
||||
{
|
||||
response.setHeader(input.getKey(), input.getValue());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
)
|
||||
.allMatch(Predicates.alwaysTrue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse<OutputStream> handleResponse(HttpResponse clientResponse)
|
||||
{
|
||||
copyStatusHeaders(clientResponse);
|
||||
|
||||
try {
|
||||
ChannelBuffer buf = clientResponse.getContent();
|
||||
buf.readBytes(outputStream, buf.readableBytes());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
if (ctx != null) {
|
||||
ctx.complete();
|
||||
}
|
||||
resp.flushBuffer();
|
||||
return ClientResponse.finished(outputStream);
|
||||
}
|
||||
catch (IOException e1) {
|
||||
Throwables.propagate(e1);
|
||||
|
||||
@Override
|
||||
public ClientResponse<OutputStream> handleChunk(
|
||||
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
|
||||
)
|
||||
{
|
||||
try {
|
||||
ChannelBuffer buf = chunk.getContent();
|
||||
buf.readBytes(outputStream, buf.readableBytes());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return clientResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
|
||||
{
|
||||
asyncContext.complete();
|
||||
return ClientResponse.finished(clientResponse.getObj());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(
|
||||
ClientResponse<OutputStream> clientResponse,
|
||||
Throwable e
|
||||
)
|
||||
{
|
||||
// throwing an exception here may cause resource leak
|
||||
try {
|
||||
handleException(objectMapper, asyncContext, e);
|
||||
} catch(Exception err) {
|
||||
log.error(err, "Unable to handle exception response");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue