when using compressed source, extract the compressed source directly into the response without any buffering

This commit is contained in:
kimchy 2010-08-19 18:29:36 +03:00
parent 46ccee8f89
commit 19abe7a2a5
10 changed files with 168 additions and 59 deletions

View File

@ -24,11 +24,12 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Unicode; import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.compress.lzf.LZFDecoder; import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.builder.XContentBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
@ -45,7 +46,7 @@ import static org.elasticsearch.common.collect.Maps.*;
* @see GetRequest * @see GetRequest
* @see org.elasticsearch.client.Client#get(GetRequest) * @see org.elasticsearch.client.Client#get(GetRequest)
*/ */
public class GetResponse implements ActionResponse, Streamable, Iterable<GetField> { public class GetResponse implements ActionResponse, Streamable, Iterable<GetField>, ToXContent {
private String index; private String index;
@ -216,6 +217,63 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
return fields.values().iterator(); return fields.values().iterator();
} }
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
if (!exists()) {
builder.startObject();
builder.field("_index", index);
builder.field("_type", type);
builder.field("_id", id);
builder.endObject();
} else {
builder.startObject();
builder.field("_index", index);
builder.field("_type", type);
builder.field("_id", id);
if (source != null) {
if (LZFDecoder.isCompressed(source)) {
BytesStreamInput siBytes = new BytesStreamInput(source);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
if (contentType == builder.contentType()) {
builder.rawField("_source", siLzf);
} else {
builder.field("_source", XContentFactory.xContent(builder.contentType()).createParser(siLzf).map());
}
} else {
if (XContentFactory.xContentType(source) == builder.contentType()) {
builder.rawField("_source", source);
} else {
builder.field("_source", XContentFactory.xContent(builder.contentType()).createParser(source).map());
}
}
}
if (fields != null && !fields.isEmpty()) {
builder.startObject("fields");
for (GetField field : fields.values()) {
if (field.values().isEmpty()) {
continue;
}
if (field.values().size() == 1) {
builder.field(field.name(), field.values().get(0));
} else {
builder.field(field.name());
builder.startArray();
for (Object value : field.values()) {
builder.value(value);
}
builder.endArray();
}
}
builder.endObject();
}
builder.endObject();
}
}
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
index = in.readUTF(); index = in.readUTF();
type = in.readUTF(); type = in.readUTF();

View File

@ -89,6 +89,11 @@ public abstract class Streams {
// Copy methods for java.io.InputStream / java.io.OutputStream // Copy methods for java.io.InputStream / java.io.OutputStream
//--------------------------------------------------------------------- //---------------------------------------------------------------------
public static long copy(InputStream in, OutputStream out) throws IOException {
return copy(in, out, new byte[BUFFER_SIZE]);
}
/** /**
* Copy the contents of the given InputStream to the given OutputStream. * Copy the contents of the given InputStream to the given OutputStream.
* Closes both streams when done. * Closes both streams when done.
@ -98,12 +103,11 @@ public abstract class Streams {
* @return the number of bytes copied * @return the number of bytes copied
* @throws IOException in case of I/O errors * @throws IOException in case of I/O errors
*/ */
public static long copy(InputStream in, OutputStream out) throws IOException { public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
Preconditions.checkNotNull(in, "No InputStream specified"); Preconditions.checkNotNull(in, "No InputStream specified");
Preconditions.checkNotNull(out, "No OutputStream specified"); Preconditions.checkNotNull(out, "No OutputStream specified");
try { try {
long byteCount = 0; long byteCount = 0;
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead; int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) { while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead); out.write(buffer, 0, bytesRead);

View File

@ -46,6 +46,12 @@ public class CachedStreamInput {
} }
}; };
public static LZFStreamInput cachedLzf(StreamInput in) throws IOException {
LZFStreamInput lzf = cache.get().get().lzf;
lzf.reset(in);
return lzf;
}
public static HandlesStreamInput cachedHandles(StreamInput in) { public static HandlesStreamInput cachedHandles(StreamInput in) {
HandlesStreamInput handles = cache.get().get().handles; HandlesStreamInput handles = cache.get().get().handles;
handles.reset(in); handles.reset(in);

View File

@ -112,6 +112,13 @@ public class LZFStreamInput extends StreamInput {
readyBuffer(); readyBuffer();
} }
/**
* Expert!, resets to buffer start, without the need to decompress it again.
*/
public void resetToBufferStart() {
this.bufferPosition = 0;
}
@Override public void close() throws IOException { @Override public void close() throws IOException {
in.close(); in.close();
} }

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.xson.XsonXContent; import org.elasticsearch.common.xcontent.xson.XsonXContent;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
/** /**
@ -151,6 +152,36 @@ public class XContentFactory {
return xContentType(data, 0, data.length); return xContentType(data, 0, data.length);
} }
/**
* Guesses the content type based on the provided input stream.
*/
public static XContentType xContentType(InputStream si) throws IOException {
int first = si.read();
if (first == -1) {
return null;
}
int second = si.read();
if (second == -1) {
return null;
}
if (first == 0x00 && second == 0x00) {
return XContentType.XSON;
}
if (first == '{' || second == '{') {
return XContentType.JSON;
}
for (int i = 2; i < GUESS_HEADER_LENGTH; i++) {
int val = si.read();
if (val == -1) {
return null;
}
if (val == '{') {
return XContentType.JSON;
}
}
return null;
}
/** /**
* Guesses the content type based on the provided bytes. * Guesses the content type based on the provided bytes.
*/ */

View File

@ -21,9 +21,11 @@ package org.elasticsearch.common.xcontent.builder;
import org.elasticsearch.common.Unicode; import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContent;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -34,6 +36,8 @@ public class BinaryXContentBuilder extends XContentBuilder<BinaryXContentBuilder
private final XContent xContent; private final XContent xContent;
private byte[] bytes;
public BinaryXContentBuilder(XContent xContent) throws IOException { public BinaryXContentBuilder(XContent xContent) throws IOException {
this.bos = new FastByteArrayOutputStream(); this.bos = new FastByteArrayOutputStream();
this.xContent = xContent; this.xContent = xContent;
@ -47,6 +51,15 @@ public class BinaryXContentBuilder extends XContentBuilder<BinaryXContentBuilder
return this; return this;
} }
@Override public BinaryXContentBuilder raw(InputStream content) throws IOException {
flush();
if (bytes == null) {
bytes = new byte[Streams.BUFFER_SIZE];
}
Streams.copy(content, bos, bytes);
return this;
}
@Override public BinaryXContentBuilder reset() throws IOException { @Override public BinaryXContentBuilder reset() throws IOException {
fieldCaseConversion = globalFieldCaseConversion; fieldCaseConversion = globalFieldCaseConversion;
bos.reset(); bos.reset();

View File

@ -21,10 +21,13 @@ package org.elasticsearch.common.xcontent.builder;
import org.apache.lucene.util.UnicodeUtil; import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.common.Unicode; import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.FastCharArrayWriter; import org.elasticsearch.common.io.FastCharArrayWriter;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContent;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -44,9 +47,18 @@ public class TextXContentBuilder extends XContentBuilder<TextXContentBuilder> {
this.builder = this; this.builder = this;
} }
@Override public TextXContentBuilder raw(byte[] json) throws IOException { @Override public TextXContentBuilder raw(byte[] content) throws IOException {
flush(); flush();
Unicode.UTF16Result result = Unicode.unsafeFromBytesAsUtf16(json); Unicode.UTF16Result result = Unicode.unsafeFromBytesAsUtf16(content);
writer.write(result.result, 0, result.length);
return this;
}
@Override public TextXContentBuilder raw(InputStream content) throws IOException {
FastByteArrayOutputStream os = new FastByteArrayOutputStream();
Streams.copy(content, os);
flush();
Unicode.UTF16Result result = Unicode.unsafeFromBytesAsUtf16(os.unsafeByteArray(), 0, os.size());
writer.write(result.result, 0, result.length); writer.write(result.result, 0, result.length);
return this; return this;
} }

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapConverter; import org.elasticsearch.common.xcontent.support.XContentMapConverter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
@ -288,8 +289,15 @@ public abstract class XContentBuilder<T extends XContentBuilder> {
return raw(content); return raw(content);
} }
public T rawField(String fieldName, InputStream content) throws IOException {
generator.writeRawFieldStart(fieldName);
return raw(content);
}
public abstract T raw(byte[] content) throws IOException; public abstract T raw(byte[] content) throws IOException;
public abstract T raw(InputStream content) throws IOException;
public T value(Boolean value) throws IOException { public T value(Boolean value) throws IOException {
return value(value.booleanValue()); return value(value.booleanValue());
} }

View File

@ -20,13 +20,11 @@
package org.elasticsearch.rest.action.get; package org.elasticsearch.rest.action.get;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetField;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.builder.XContentBuilder; import org.elasticsearch.common.xcontent.builder.XContentBuilder;
import org.elasticsearch.rest.*; import org.elasticsearch.rest.*;
@ -72,52 +70,13 @@ public class RestGetAction extends BaseRestHandler {
client.get(getRequest, new ActionListener<GetResponse>() { client.get(getRequest, new ActionListener<GetResponse>() {
@Override public void onResponse(GetResponse response) { @Override public void onResponse(GetResponse response) {
try { try {
XContentBuilder builder = restContentBuilder(request);
response.toXContent(builder, request);
if (!response.exists()) { if (!response.exists()) {
XContentBuilder builder = restContentBuilder(request);
builder.startObject();
builder.field("_index", response.index());
builder.field("_type", response.type());
builder.field("_id", response.id());
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, NOT_FOUND, builder)); channel.sendResponse(new XContentRestResponse(request, NOT_FOUND, builder));
} else { } else {
XContentBuilder builder = restContentBuilder(request);
builder.startObject();
builder.field("_index", response.index());
builder.field("_type", response.type());
builder.field("_id", response.id());
if (response.source() != null) {
if (builder.contentType() == XContentFactory.xContentType(response.source())) {
builder.rawField("_source", response.source());
} else {
builder.field("_source");
builder.value(response.source());
}
}
if (response.fields() != null && !response.fields().isEmpty()) {
builder.startObject("fields");
for (GetField field : response.fields().values()) {
if (field.values().isEmpty()) {
continue;
}
if (field.values().size() == 1) {
builder.field(field.name(), field.values().get(0));
} else {
builder.field(field.name());
builder.startArray();
for (Object value : field.values()) {
builder.value(value);
}
builder.endArray();
}
}
builder.endObject();
}
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder)); channel.sendResponse(new XContentRestResponse(request, OK, builder));
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -24,11 +24,11 @@ import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.Unicode; import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.compress.lzf.LZFDecoder; import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.trove.TIntObjectHashMap; import org.elasticsearch.common.trove.TIntObjectHashMap;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.builder.XContentBuilder; import org.elasticsearch.common.xcontent.builder.XContentBuilder;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHitField;
@ -259,12 +259,23 @@ public class InternalSearchHit implements SearchHit {
} else { } else {
builder.field("_score", score); builder.field("_score", score);
} }
if (source() != null) { if (source != null) {
if (XContentFactory.xContentType(source()) == builder.contentType()) { if (LZFDecoder.isCompressed(source)) {
builder.rawField("_source", source()); BytesStreamInput siBytes = new BytesStreamInput(source);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
if (contentType == builder.contentType()) {
builder.rawField("_source", siLzf);
} else {
builder.field("_source", XContentFactory.xContent(builder.contentType()).createParser(siLzf).map());
}
} else { } else {
builder.field("_source"); if (XContentFactory.xContentType(source) == builder.contentType()) {
builder.value(source()); builder.rawField("_source", source);
} else {
builder.field("_source", XContentFactory.xContent(builder.contentType()).createParser(source).map());
}
} }
} }
if (fields != null && !fields.isEmpty()) { if (fields != null && !fields.isEmpty()) {