diff --git a/plugins/transport/thrift/src/main/java/org/elasticsearch/thrift/ThriftRestImpl.java b/plugins/transport/thrift/src/main/java/org/elasticsearch/thrift/ThriftRestImpl.java index 6c42e151e19..a4afd9e1fbd 100644 --- a/plugins/transport/thrift/src/main/java/org/elasticsearch/thrift/ThriftRestImpl.java +++ b/plugins/transport/thrift/src/main/java/org/elasticsearch/thrift/ThriftRestImpl.java @@ -49,16 +49,20 @@ public class ThriftRestImpl extends AbstractComponent implements Rest.Iface { logger.trace("thrift message {}", request); } final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference ref = new AtomicReference(); + final AtomicReference ref = new AtomicReference(); restController.dispatchRequest(new ThriftRestRequest(request), new RestChannel() { @Override public void sendResponse(RestResponse response) { - ref.set(response); + try { + ref.set(convert(response)); + } catch (IOException e) { + // ignore, should not happen... + } latch.countDown(); } }); try { latch.await(); - return convert(ref.get()); + return ref.get(); } catch (Exception e) { throw new TException("failed to generate response", e); } @@ -67,7 +71,14 @@ public class ThriftRestImpl extends AbstractComponent implements Rest.Iface { private org.elasticsearch.thrift.RestResponse convert(RestResponse response) throws IOException { org.elasticsearch.thrift.RestResponse tResponse = new org.elasticsearch.thrift.RestResponse(getStatus(response.status())); if (response.contentLength() > 0) { - tResponse.setBody(ByteBuffer.wrap(response.content(), 0, response.contentLength())); + if (response.contentThreadSafe()) { + tResponse.setBody(ByteBuffer.wrap(response.content(), 0, response.contentLength())); + } else { + // argh!, we need to copy it over since we are not on the same thread... + byte[] body = new byte[response.contentLength()]; + System.arraycopy(response.content(), 0, body, 0, response.contentLength()); + tResponse.setBody(ByteBuffer.wrap(body)); + } } return tResponse; }