delete by query to use byte reference serialization

This commit is contained in:
Shay Banon 2012-01-08 20:52:48 +02:00
parent 0cc906aa21
commit 0f1b3f0457
8 changed files with 37 additions and 60 deletions

View File

@ -26,10 +26,7 @@ import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest; import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.*;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.BytesStream; import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -52,7 +49,6 @@ import static org.elasticsearch.action.Actions.addValidationError;
* <p>The request requires the query source to be set either using {@link #query(org.elasticsearch.index.query.QueryBuilder)}, * <p>The request requires the query source to be set either using {@link #query(org.elasticsearch.index.query.QueryBuilder)},
* or {@link #query(byte[])}. * or {@link #query(byte[])}.
* *
*
* @see DeleteByQueryResponse * @see DeleteByQueryResponse
* @see org.elasticsearch.client.Requests#deleteByQueryRequest(String...) * @see org.elasticsearch.client.Requests#deleteByQueryRequest(String...)
* @see org.elasticsearch.client.Client#deleteByQuery(DeleteByQueryRequest) * @see org.elasticsearch.client.Client#deleteByQuery(DeleteByQueryRequest)
@ -110,13 +106,13 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
/** /**
* The query source to execute. * The query source to execute.
*/ */
byte[] querySource() { BytesHolder querySource() {
if (querySourceUnsafe || querySourceOffset > 0) { if (querySourceUnsafe) {
querySource = Arrays.copyOfRange(querySource, querySourceOffset, querySourceOffset + querySourceLength); querySource = Arrays.copyOfRange(querySource, querySourceOffset, querySourceOffset + querySourceLength);
querySourceOffset = 0; querySourceOffset = 0;
querySourceUnsafe = false; querySourceUnsafe = false;
} }
return querySource; return new BytesHolder(querySource, querySourceOffset, querySourceLength);
} }
/** /**
@ -274,11 +270,11 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
BytesHolder bytes = in.readBytesReference();
querySourceUnsafe = false; querySourceUnsafe = false;
querySourceOffset = 0; querySource = bytes.bytes();
querySourceLength = in.readVInt(); querySourceOffset = bytes.offset();
querySource = new byte[querySourceLength]; querySourceLength = bytes.length();
in.readFully(querySource);
if (in.readBoolean()) { if (in.readBoolean()) {
routing = in.readUTF(); routing = in.readUTF();
@ -298,8 +294,7 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeVInt(querySourceLength); out.writeBytesHolder(querySource, querySourceOffset, querySourceLength);
out.writeBytes(querySource, querySourceOffset, querySourceLength);
if (routing == null) { if (routing == null) {
out.writeBoolean(false); out.writeBoolean(false);

View File

@ -22,13 +22,12 @@ package org.elasticsearch.action.deletebyquery;
import gnu.trove.set.hash.THashSet; import gnu.trove.set.hash.THashSet;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest; import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
@ -37,12 +36,10 @@ import static org.elasticsearch.action.Actions.addValidationError;
/** /**
* Delete by query request to execute on a specific index. * Delete by query request to execute on a specific index.
*
*
*/ */
public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest { public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest {
private byte[] querySource; private BytesHolder querySource;
private String[] types = Strings.EMPTY_ARRAY; private String[] types = Strings.EMPTY_ARRAY;
@Nullable @Nullable
private Set<String> routing; private Set<String> routing;
@ -63,7 +60,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
IndexDeleteByQueryRequest() { IndexDeleteByQueryRequest() {
} }
byte[] querySource() { BytesHolder querySource() {
return querySource; return querySource;
} }
@ -76,17 +73,6 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
return validationException; return validationException;
} }
@Required
public IndexDeleteByQueryRequest querySource(QueryBuilder queryBuilder) {
return querySource(queryBuilder.buildAsBytes());
}
@Required
public IndexDeleteByQueryRequest querySource(byte[] querySource) {
this.querySource = querySource;
return this;
}
Set<String> routing() { Set<String> routing() {
return this.routing; return this.routing;
} }
@ -106,8 +92,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
querySource = new byte[in.readVInt()]; querySource = in.readBytesReference();
in.readFully(querySource);
int typesSize = in.readVInt(); int typesSize = in.readVInt();
if (typesSize > 0) { if (typesSize > 0) {
types = new String[typesSize]; types = new String[typesSize];
@ -133,8 +118,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeVInt(querySource.length); out.writeBytesHolder(querySource);
out.writeBytes(querySource);
out.writeVInt(types.length); out.writeVInt(types.length);
for (String type : types) { for (String type : types) {
out.writeUTF(type); out.writeUTF(type);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.deletebyquery;
import gnu.trove.set.hash.THashSet; import gnu.trove.set.hash.THashSet;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode; import org.elasticsearch.common.Unicode;
@ -36,13 +37,11 @@ import static org.elasticsearch.action.Actions.addValidationError;
/** /**
* Delete by query request to execute on a specific shard. * Delete by query request to execute on a specific shard.
*
*
*/ */
public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest { public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest {
private int shardId; private int shardId;
private byte[] querySource; private BytesHolder querySource;
private String[] types = Strings.EMPTY_ARRAY; private String[] types = Strings.EMPTY_ARRAY;
@Nullable @Nullable
private Set<String> routing; private Set<String> routing;
@ -77,7 +76,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
return this.shardId; return this.shardId;
} }
public byte[] querySource() { BytesHolder querySource() {
return querySource; return querySource;
} }
@ -96,8 +95,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
querySource = new byte[in.readVInt()]; querySource = in.readBytesReference();
in.readFully(querySource);
shardId = in.readVInt(); shardId = in.readVInt();
int typesSize = in.readVInt(); int typesSize = in.readVInt();
if (typesSize > 0) { if (typesSize > 0) {
@ -125,8 +123,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeVInt(querySource.length); out.writeBytesHolder(querySource);
out.writeBytes(querySource);
out.writeVInt(shardId); out.writeVInt(shardId);
out.writeVInt(types.length); out.writeVInt(types.length);
for (String type : types) { for (String type : types) {
@ -154,7 +151,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
public String toString() { public String toString() {
String sSource = "_na_"; String sSource = "_na_";
try { try {
sSource = Unicode.fromBytes(querySource); sSource = Unicode.fromBytes(querySource.bytes(), querySource.offset(), querySource.length());
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }

View File

@ -27,6 +27,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.Filter; import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.CloseableComponent; import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
@ -692,7 +693,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
static class DeleteByQuery { static class DeleteByQuery {
private final Query query; private final Query query;
private final byte[] source; private final BytesHolder source;
private final String[] filteringAliases; private final String[] filteringAliases;
private final Filter aliasFilter; private final Filter aliasFilter;
private final String[] types; private final String[] types;
@ -700,7 +701,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private long startTime; private long startTime;
private long endTime; private long endTime;
public DeleteByQuery(Query query, byte[] source, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, String... types) { public DeleteByQuery(Query query, BytesHolder source, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, String... types) {
this.query = query; this.query = query;
this.source = source; this.source = source;
this.types = types; this.types = types;
@ -712,7 +713,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.query; return this.query;
} }
public byte[] source() { public BytesHolder source() {
return this.source; return this.source;
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.shard.service;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
@ -83,7 +84,7 @@ public interface IndexShard extends IndexShardComponent {
void delete(Engine.Delete delete) throws ElasticSearchException; void delete(Engine.Delete delete) throws ElasticSearchException;
Engine.DeleteByQuery prepareDeleteByQuery(byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException; Engine.DeleteByQuery prepareDeleteByQuery(BytesHolder querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException;
void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticSearchException; void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticSearchException;

View File

@ -30,6 +30,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -345,12 +346,12 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
@Override @Override
public Engine.DeleteByQuery prepareDeleteByQuery(byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException { public Engine.DeleteByQuery prepareDeleteByQuery(BytesHolder querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
if (types == null) { if (types == null) {
types = Strings.EMPTY_ARRAY; types = Strings.EMPTY_ARRAY;
} }
Query query = queryParserService.parse(querySource).query(); Query query = queryParserService.parse(querySource.bytes(), querySource.offset(), querySource.length()).query();
query = filterQueryIfNeeded(query, types); query = filterQueryIfNeeded(query, types);
Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases); Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases);

View File

@ -553,7 +553,7 @@ public interface Translog extends IndexShardComponent {
} }
static class DeleteByQuery implements Operation { static class DeleteByQuery implements Operation {
private byte[] source; private BytesHolder source;
@Nullable @Nullable
private String[] filteringAliases; private String[] filteringAliases;
private String[] types = Strings.EMPTY_ARRAY; private String[] types = Strings.EMPTY_ARRAY;
@ -565,7 +565,7 @@ public interface Translog extends IndexShardComponent {
this(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types()); this(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types());
} }
public DeleteByQuery(byte[] source, String[] filteringAliases, String... types) { public DeleteByQuery(BytesHolder source, String[] filteringAliases, String... types) {
this.source = source; this.source = source;
this.types = types == null ? Strings.EMPTY_ARRAY : types; this.types = types == null ? Strings.EMPTY_ARRAY : types;
this.filteringAliases = filteringAliases; this.filteringAliases = filteringAliases;
@ -578,10 +578,10 @@ public interface Translog extends IndexShardComponent {
@Override @Override
public long estimateSize() { public long estimateSize() {
return source.length + 8; return source.length() + 8;
} }
public byte[] source() { public BytesHolder source() {
return this.source; return this.source;
} }
@ -601,8 +601,7 @@ public interface Translog extends IndexShardComponent {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version int version = in.readVInt(); // version
source = new byte[in.readVInt()]; source = in.readBytesReference();
in.readFully(source);
if (version < 2) { if (version < 2) {
// for query_parser_name, which was removed // for query_parser_name, which was removed
if (in.readBoolean()) { if (in.readBoolean()) {
@ -630,8 +629,7 @@ public interface Translog extends IndexShardComponent {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(2); // version out.writeVInt(2); // version
out.writeVInt(source.length); out.writeBytesHolder(source);
out.writeBytes(source);
out.writeVInt(types.length); out.writeVInt(types.length);
for (String type : types) { for (String type : types) {
out.writeUTF(type); out.writeUTF(type);

View File

@ -130,7 +130,7 @@ public abstract class AbstractSimpleTranslogTests {
assertThat(snapshot.estimatedTotalOperations(), equalTo(3)); assertThat(snapshot.estimatedTotalOperations(), equalTo(3));
snapshot.release(); snapshot.release();
translog.add(new Translog.DeleteByQuery(new byte[]{4}, null)); translog.add(new Translog.DeleteByQuery(new BytesHolder(new byte[]{4}), null));
snapshot = translog.snapshot(); snapshot = translog.snapshot();
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(4)); MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(4));
assertThat(snapshot.estimatedTotalOperations(), equalTo(4)); assertThat(snapshot.estimatedTotalOperations(), equalTo(4));
@ -152,7 +152,7 @@ public abstract class AbstractSimpleTranslogTests {
assertThat(snapshot.hasNext(), equalTo(true)); assertThat(snapshot.hasNext(), equalTo(true));
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) snapshot.next(); Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) snapshot.next();
assertThat(deleteByQuery.source(), equalTo(new byte[]{4})); assertThat(deleteByQuery.source().copyBytes(), equalTo(new byte[]{4}));
assertThat(snapshot.hasNext(), equalTo(false)); assertThat(snapshot.hasNext(), equalTo(false));