refactor json handling to use byte[] instead of string for better performance, storage, and memory consumption (apply to count and delete by query)
This commit is contained in:
parent
c111e1ab80
commit
defb6a336d
|
@ -38,7 +38,7 @@ public class CountRequest extends BroadcastOperationRequest {
|
|||
public static final float DEFAULT_MIN_SCORE = -1f;
|
||||
|
||||
private float minScore = DEFAULT_MIN_SCORE;
|
||||
@Required private String querySource;
|
||||
@Required private byte[] querySource;
|
||||
private String[] types = Strings.EMPTY_ARRAY;
|
||||
@Nullable private String queryParserName;
|
||||
|
||||
|
@ -73,15 +73,15 @@ public class CountRequest extends BroadcastOperationRequest {
|
|||
return this;
|
||||
}
|
||||
|
||||
String querySource() {
|
||||
byte[] querySource() {
|
||||
return querySource;
|
||||
}
|
||||
|
||||
@Required public CountRequest querySource(QueryBuilder queryBuilder) {
|
||||
return querySource(queryBuilder.buildAsString());
|
||||
return querySource(queryBuilder.buildAsBytes());
|
||||
}
|
||||
|
||||
public CountRequest querySource(String querySource) {
|
||||
public CountRequest querySource(byte[] querySource) {
|
||||
this.querySource = querySource;
|
||||
return this;
|
||||
}
|
||||
|
@ -107,7 +107,8 @@ public class CountRequest extends BroadcastOperationRequest {
|
|||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
minScore = in.readFloat();
|
||||
querySource = in.readUTF();
|
||||
querySource = new byte[in.readInt()];
|
||||
in.readFully(querySource());
|
||||
if (in.readBoolean()) {
|
||||
queryParserName = in.readUTF();
|
||||
}
|
||||
|
@ -123,7 +124,8 @@ public class CountRequest extends BroadcastOperationRequest {
|
|||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeFloat(minScore);
|
||||
out.writeUTF(querySource);
|
||||
out.writeInt(querySource.length);
|
||||
out.write(querySource);
|
||||
if (queryParserName == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.io.IOException;
|
|||
public class ShardCountRequest extends BroadcastShardOperationRequest {
|
||||
|
||||
private float minScore;
|
||||
private String querySource;
|
||||
private byte[] querySource;
|
||||
private String[] types = Strings.EMPTY_ARRAY;
|
||||
@Nullable private String queryParserName;
|
||||
|
||||
|
@ -41,7 +41,7 @@ public class ShardCountRequest extends BroadcastShardOperationRequest {
|
|||
|
||||
}
|
||||
|
||||
public ShardCountRequest(String index, int shardId, String querySource, float minScore,
|
||||
public ShardCountRequest(String index, int shardId, byte[] querySource, float minScore,
|
||||
@Nullable String queryParserName, String... types) {
|
||||
super(index, shardId);
|
||||
this.minScore = minScore;
|
||||
|
@ -54,7 +54,7 @@ public class ShardCountRequest extends BroadcastShardOperationRequest {
|
|||
return minScore;
|
||||
}
|
||||
|
||||
public String querySource() {
|
||||
public byte[] querySource() {
|
||||
return querySource;
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,8 @@ public class ShardCountRequest extends BroadcastShardOperationRequest {
|
|||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
minScore = in.readFloat();
|
||||
querySource = in.readUTF();
|
||||
querySource = new byte[in.readInt()];
|
||||
in.readFully(querySource);
|
||||
if (in.readBoolean()) {
|
||||
queryParserName = in.readUTF();
|
||||
}
|
||||
|
@ -85,7 +86,8 @@ public class ShardCountRequest extends BroadcastShardOperationRequest {
|
|||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeFloat(minScore);
|
||||
out.writeUTF(querySource);
|
||||
out.writeInt(querySource.length);
|
||||
out.write(querySource);
|
||||
if (queryParserName == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
|
|
|
@ -34,7 +34,7 @@ import java.io.IOException;
|
|||
*/
|
||||
public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
|
||||
|
||||
private String querySource;
|
||||
private byte[] querySource;
|
||||
private String queryParserName;
|
||||
private String[] types = Strings.EMPTY_ARRAY;
|
||||
|
||||
|
@ -50,15 +50,15 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
|
|||
return this;
|
||||
}
|
||||
|
||||
String querySource() {
|
||||
byte[] querySource() {
|
||||
return querySource;
|
||||
}
|
||||
|
||||
@Required public DeleteByQueryRequest querySource(QueryBuilder queryBuilder) {
|
||||
return querySource(queryBuilder.buildAsString());
|
||||
return querySource(queryBuilder.buildAsBytes());
|
||||
}
|
||||
|
||||
@Required public DeleteByQueryRequest querySource(String querySource) {
|
||||
@Required public DeleteByQueryRequest querySource(byte[] querySource) {
|
||||
this.querySource = querySource;
|
||||
return this;
|
||||
}
|
||||
|
@ -88,7 +88,8 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
|
|||
|
||||
public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
querySource = in.readUTF();
|
||||
querySource = new byte[in.readInt()];
|
||||
in.readFully(querySource);
|
||||
if (in.readBoolean()) {
|
||||
queryParserName = in.readUTF();
|
||||
}
|
||||
|
@ -96,7 +97,8 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
|
|||
|
||||
public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeUTF(querySource);
|
||||
out.writeInt(querySource.length);
|
||||
out.write(querySource);
|
||||
if (queryParserName == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
|
|
|
@ -37,7 +37,7 @@ import static org.elasticsearch.action.Actions.*;
|
|||
*/
|
||||
public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest {
|
||||
|
||||
private String querySource;
|
||||
private byte[] querySource;
|
||||
private String queryParserName;
|
||||
private String[] types = Strings.EMPTY_ARRAY;
|
||||
|
||||
|
@ -58,7 +58,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
|
|||
IndexDeleteByQueryRequest() {
|
||||
}
|
||||
|
||||
String querySource() {
|
||||
byte[] querySource() {
|
||||
return querySource;
|
||||
}
|
||||
|
||||
|
@ -71,10 +71,10 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
|
|||
}
|
||||
|
||||
@Required public IndexDeleteByQueryRequest querySource(QueryBuilder queryBuilder) {
|
||||
return querySource(queryBuilder.buildAsString());
|
||||
return querySource(queryBuilder.buildAsBytes());
|
||||
}
|
||||
|
||||
@Required public IndexDeleteByQueryRequest querySource(String querySource) {
|
||||
@Required public IndexDeleteByQueryRequest querySource(byte[] querySource) {
|
||||
this.querySource = querySource;
|
||||
return this;
|
||||
}
|
||||
|
@ -99,7 +99,8 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
|
|||
|
||||
public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
querySource = in.readUTF();
|
||||
querySource = new byte[in.readInt()];
|
||||
in.readFully(querySource);
|
||||
if (in.readBoolean()) {
|
||||
queryParserName = in.readUTF();
|
||||
}
|
||||
|
@ -114,7 +115,8 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
|
|||
|
||||
public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeUTF(querySource);
|
||||
out.writeInt(querySource.length);
|
||||
out.write(querySource);
|
||||
if (queryParserName == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
|
|
|
@ -36,11 +36,11 @@ import static org.elasticsearch.action.Actions.*;
|
|||
public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest {
|
||||
|
||||
private int shardId;
|
||||
private String querySource;
|
||||
private byte[] querySource;
|
||||
private String queryParserName;
|
||||
private String[] types = Strings.EMPTY_ARRAY;
|
||||
|
||||
public ShardDeleteByQueryRequest(String index, String querySource, @Nullable String queryParserName, String[] types, int shardId) {
|
||||
public ShardDeleteByQueryRequest(String index, byte[] querySource, @Nullable String queryParserName, String[] types, int shardId) {
|
||||
this.index = index;
|
||||
this.querySource = querySource;
|
||||
this.queryParserName = queryParserName;
|
||||
|
@ -68,7 +68,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
|
|||
return this.shardId;
|
||||
}
|
||||
|
||||
public String querySource() {
|
||||
public byte[] querySource() {
|
||||
return querySource;
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,8 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
|
|||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
querySource = in.readUTF();
|
||||
querySource = new byte[in.readInt()];
|
||||
in.readFully(querySource);
|
||||
if (in.readBoolean()) {
|
||||
queryParserName = in.readUTF();
|
||||
}
|
||||
|
@ -98,7 +99,8 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
|
|||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeUTF(querySource);
|
||||
out.writeInt(querySource.length);
|
||||
out.write(querySource);
|
||||
if (queryParserName == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
|
|
|
@ -314,10 +314,10 @@ public interface Engine extends IndexShardComponent {
|
|||
static class DeleteByQuery {
|
||||
private final Query query;
|
||||
private final String queryParserName;
|
||||
private final String source;
|
||||
private final byte[] source;
|
||||
private final String[] types;
|
||||
|
||||
public DeleteByQuery(Query query, String source, @Nullable String queryParserName, String... types) {
|
||||
public DeleteByQuery(Query query, byte[] source, @Nullable String queryParserName, String... types) {
|
||||
this.query = query;
|
||||
this.source = source;
|
||||
this.queryParserName = queryParserName;
|
||||
|
@ -332,7 +332,7 @@ public interface Engine extends IndexShardComponent {
|
|||
return this.query;
|
||||
}
|
||||
|
||||
public String source() {
|
||||
public byte[] source() {
|
||||
return this.source;
|
||||
}
|
||||
|
||||
|
|
|
@ -53,11 +53,11 @@ public interface IndexShard extends IndexShardComponent {
|
|||
|
||||
void delete(Term uid);
|
||||
|
||||
void deleteByQuery(String querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
|
||||
void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
|
||||
|
||||
byte[] get(String type, String id) throws ElasticSearchException;
|
||||
|
||||
long count(float minScore, String querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
|
||||
long count(float minScore, byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
|
||||
|
||||
void refresh(Engine.Refresh refresh) throws ElasticSearchException;
|
||||
|
||||
|
|
|
@ -254,7 +254,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
engine.delete(new Engine.Delete(uid));
|
||||
}
|
||||
|
||||
public void deleteByQuery(String querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
|
||||
public void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
|
||||
writeAllowed();
|
||||
if (types == null) {
|
||||
types = Strings.EMPTY_ARRAY;
|
||||
|
@ -262,7 +262,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
innerDeleteByQuery(querySource, queryParserName, types);
|
||||
}
|
||||
|
||||
private void innerDeleteByQuery(String querySource, String queryParserName, String... types) {
|
||||
private void innerDeleteByQuery(byte[] querySource, String queryParserName, String... types) {
|
||||
IndexQueryParser queryParser = queryParserService.defaultIndexQueryParser();
|
||||
if (queryParserName != null) {
|
||||
queryParser = queryParserService.indexQueryParser(queryParserName);
|
||||
|
@ -307,7 +307,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
}
|
||||
}
|
||||
|
||||
public long count(float minScore, String querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
|
||||
public long count(float minScore, byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
|
||||
readAllowed();
|
||||
IndexQueryParser queryParser = queryParserService.defaultIndexQueryParser();
|
||||
if (queryParserName != null) {
|
||||
|
|
|
@ -303,7 +303,7 @@ public interface Translog extends IndexShardComponent {
|
|||
}
|
||||
|
||||
static class DeleteByQuery implements Operation {
|
||||
private String source;
|
||||
private byte[] source;
|
||||
@Nullable private String queryParserName;
|
||||
private String[] types = Strings.EMPTY_ARRAY;
|
||||
|
||||
|
@ -314,7 +314,7 @@ public interface Translog extends IndexShardComponent {
|
|||
this(deleteByQuery.source(), deleteByQuery.queryParserName(), deleteByQuery.types());
|
||||
}
|
||||
|
||||
public DeleteByQuery(String source, @Nullable String queryParserName, String... types) {
|
||||
public DeleteByQuery(byte[] source, @Nullable String queryParserName, String... types) {
|
||||
this.queryParserName = queryParserName;
|
||||
this.source = source;
|
||||
this.types = types;
|
||||
|
@ -325,14 +325,14 @@ public interface Translog extends IndexShardComponent {
|
|||
}
|
||||
|
||||
@Override public long estimateSize() {
|
||||
return ((source.length() + (queryParserName == null ? 0 : queryParserName.length())) * 2) + 8;
|
||||
return source.length + ((queryParserName == null ? 0 : queryParserName.length()) * 2) + 8;
|
||||
}
|
||||
|
||||
public String queryParserName() {
|
||||
return this.queryParserName;
|
||||
}
|
||||
|
||||
public String source() {
|
||||
public byte[] source() {
|
||||
return this.source;
|
||||
}
|
||||
|
||||
|
@ -345,7 +345,8 @@ public interface Translog extends IndexShardComponent {
|
|||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
source = in.readUTF();
|
||||
source = new byte[in.readInt()];
|
||||
in.readFully(source);
|
||||
if (in.readBoolean()) {
|
||||
queryParserName = in.readUTF();
|
||||
}
|
||||
|
@ -359,7 +360,8 @@ public interface Translog extends IndexShardComponent {
|
|||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
out.writeUTF(source);
|
||||
out.writeInt(source.length);
|
||||
out.write(source);
|
||||
if (queryParserName == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
|
|
|
@ -70,9 +70,9 @@ public class RestActions {
|
|||
builder.endObject();
|
||||
}
|
||||
|
||||
public static String parseQuerySource(RestRequest request) {
|
||||
public static byte[] parseQuerySource(RestRequest request) {
|
||||
if (request.hasContent()) {
|
||||
return request.contentAsString();
|
||||
return request.contentAsBytes();
|
||||
}
|
||||
String queryString = request.param("q");
|
||||
if (queryString == null) {
|
||||
|
@ -91,7 +91,7 @@ public class RestActions {
|
|||
throw new ElasticSearchIllegalArgumentException("Unsupported defaultOperator [" + defaultOperator + "], can either be [OR] or [AND]");
|
||||
}
|
||||
}
|
||||
return queryBuilder.buildAsString();
|
||||
return queryBuilder.buildAsBytes();
|
||||
}
|
||||
|
||||
public static String[] splitIndices(String indices) {
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.testng.annotations.AfterMethod;
|
|||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.elasticsearch.index.query.json.JsonQueryBuilders.*;
|
||||
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
|
||||
import static org.hamcrest.MatcherAssert.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
@ -91,9 +92,9 @@ public class SimpleIndexShardTests {
|
|||
|
||||
assertThat(sourceFetched, equalTo(source1));
|
||||
|
||||
assertThat(indexShard.count(0, "{ term : { age : 35 } }", null), equalTo(1l));
|
||||
assertThat(indexShard.count(0, "{ queryString : { query : \"name:test\" } }", null), equalTo(1l));
|
||||
assertThat(indexShard.count(0, "{ queryString : { query : \"age:35\" } }", null), equalTo(1l));
|
||||
assertThat(indexShard.count(0, termQuery("age", 35).buildAsBytes(), null), equalTo(1l));
|
||||
assertThat(indexShard.count(0, queryString("name:test").buildAsBytes(), null), equalTo(1l));
|
||||
assertThat(indexShard.count(0, queryString("age:35").buildAsBytes(), null), equalTo(1l));
|
||||
|
||||
indexShard.delete("type1", "1");
|
||||
indexShard.refresh(new Engine.Refresh(true));
|
||||
|
@ -104,7 +105,7 @@ public class SimpleIndexShardTests {
|
|||
indexShard.refresh(new Engine.Refresh(true));
|
||||
sourceFetched = Unicode.fromBytes(indexShard.get("type1", "1"));
|
||||
assertThat(sourceFetched, equalTo(source1));
|
||||
indexShard.deleteByQuery("{ term : { name : \"test\" } }", null);
|
||||
indexShard.deleteByQuery(termQuery("name", "test").buildAsBytes(), null);
|
||||
indexShard.refresh(new Engine.Refresh(true));
|
||||
assertThat(indexShard.get("type1", "1"), nullValue());
|
||||
|
||||
|
|
|
@ -74,7 +74,7 @@ public abstract class AbstractSimpleTranslogTests {
|
|||
assertThat(snapshot, translogSize(3));
|
||||
snapshot.release();
|
||||
|
||||
translog.add(new Translog.DeleteByQuery("{4}", null));
|
||||
translog.add(new Translog.DeleteByQuery(new byte[]{4}, null));
|
||||
snapshot = translog.snapshot();
|
||||
assertThat(snapshot, translogSize(4));
|
||||
snapshot.release();
|
||||
|
@ -88,7 +88,7 @@ public abstract class AbstractSimpleTranslogTests {
|
|||
Translog.Delete delete = (Translog.Delete) it.next();
|
||||
assertThat(delete.uid(), equalTo(newUid("3")));
|
||||
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) it.next();
|
||||
assertThat(deleteByQuery.source(), equalTo("{4}"));
|
||||
assertThat(deleteByQuery.source(), equalTo(new byte[]{4}));
|
||||
snapshot.release();
|
||||
|
||||
long firstId = translog.currentId();
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.client.transport.TransportClient;
|
|||
import org.elasticsearch.server.internal.InternalServer;
|
||||
import org.elasticsearch.test.integration.AbstractServersTests;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.util.Unicode;
|
||||
import org.elasticsearch.util.transport.TransportAddress;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
@ -150,7 +151,7 @@ public class SimpleSingleTransportClientTests extends AbstractServersTests {
|
|||
assertThat(countResponse.successfulShards(), equalTo(5));
|
||||
assertThat(countResponse.failedShards(), equalTo(0));
|
||||
// test failed (simply query that can't be parsed)
|
||||
countResponse = client.count(countRequest("test").querySource("{ term : { _type : \"type1 } }")).actionGet();
|
||||
countResponse = client.count(countRequest("test").querySource(Unicode.fromStringAsBytes("{ term : { _type : \"type1 } }"))).actionGet();
|
||||
|
||||
assertThat(countResponse.count(), equalTo(0l));
|
||||
assertThat(countResponse.successfulShards(), equalTo(0));
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.action.get.GetResponse;
|
|||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
import org.elasticsearch.test.integration.AbstractServersTests;
|
||||
import org.elasticsearch.util.Unicode;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
|
@ -137,7 +138,7 @@ public class DocumentActionsTests extends AbstractServersTests {
|
|||
assertThat(countResponse.failedShards(), equalTo(0));
|
||||
|
||||
// test failed (simply query that can't be parsed)
|
||||
countResponse = client("server1").count(countRequest("test").querySource("{ term : { _type : \"type1 } }")).actionGet();
|
||||
countResponse = client("server1").count(countRequest("test").querySource(Unicode.fromStringAsBytes("{ term : { _type : \"type1 } }"))).actionGet();
|
||||
|
||||
assertThat(countResponse.count(), equalTo(0l));
|
||||
assertThat(countResponse.successfulShards(), equalTo(0));
|
||||
|
|
Loading…
Reference in New Issue