improve count serialization and deserialization

This commit is contained in:
kimchy 2010-05-13 18:12:30 +03:00
parent a9cac052ec
commit bbdbfbeb59
9 changed files with 122 additions and 42 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.count;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.Actions;
@ -35,8 +36,8 @@ import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.xcontent.XContentFactory;
import org.elasticsearch.util.xcontent.XContentType;
import org.elasticsearch.util.xcontent.builder.BinaryXContentBuilder;
import org.elasticsearch.util.xcontent.builder.XContentBuilder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
@ -60,11 +61,14 @@ public class CountRequest extends BroadcastOperationRequest {
public static final float DEFAULT_MIN_SCORE = -1f;
private float minScore = DEFAULT_MIN_SCORE;
@Required private byte[] querySource;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String queryParserName;
private transient QueryBuilder queryBuilder = null;
private byte[] querySource;
private int querySourceOffset;
private int querySourceLength;
private boolean querySourceUnsafe;
private String[] types = Strings.EMPTY_ARRAY;
private String queryParserName;
CountRequest() {
}
@ -79,7 +83,7 @@ public class CountRequest extends BroadcastOperationRequest {
@Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (querySource == null && queryBuilder == null) {
if (querySource == null) {
validationException = Actions.addValidationError("query is missing", validationException);
}
return validationException;
@ -93,6 +97,14 @@ public class CountRequest extends BroadcastOperationRequest {
return this;
}
@Override protected void beforeLocalFork() {
if (querySourceUnsafe) {
querySource = Arrays.copyOfRange(querySource, querySourceOffset, querySourceLength);
querySourceOffset = 0;
querySourceUnsafe = false;
}
}
/**
* Should the listener be called on a separate thread if needed.
*/
@ -134,20 +146,28 @@ public class CountRequest extends BroadcastOperationRequest {
* The query source to execute.
*/
byte[] querySource() {
if (querySource == null && queryBuilder != null) {
// did not get serialized...
querySource = queryBuilder.buildAsBytes(contentType);
}
return querySource;
}
int querySourceOffset() {
return querySourceOffset;
}
int querySourceLength() {
return querySourceLength;
}
/**
* The query source to execute.
*
* @see org.elasticsearch.index.query.xcontent.QueryBuilders
*/
@Required public CountRequest query(QueryBuilder queryBuilder) {
this.queryBuilder = queryBuilder;
FastByteArrayOutputStream bos = queryBuilder.buildAsUnsafeBytes();
this.querySource = bos.unsafeByteArray();
this.querySourceOffset = 0;
this.querySourceLength = bos.size();
this.querySourceUnsafe = true;
return this;
}
@ -158,11 +178,22 @@ public class CountRequest extends BroadcastOperationRequest {
try {
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(contentType);
builder.map(querySource);
this.querySource = builder.copiedBytes();
return query(builder);
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + querySource + "]", e);
}
return this;
}
@Required public CountRequest query(XContentBuilder builder) {
try {
this.querySource = builder.unsafeBytes();
this.querySourceOffset = 0;
this.querySourceLength = builder.unsafeBytesLength();
this.querySourceUnsafe = true;
return this;
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + builder + "]", e);
}
}
/**
@ -170,14 +201,29 @@ public class CountRequest extends BroadcastOperationRequest {
* or {@link #query(org.elasticsearch.index.query.QueryBuilder)}.
*/
@Required public CountRequest query(String querySource) {
return query(Unicode.fromStringAsBytes(querySource));
UnicodeUtil.UTF8Result result = Unicode.fromStringAsUtf8(querySource);
this.querySource = result.result;
this.querySourceOffset = 0;
this.querySourceLength = result.length;
this.querySourceUnsafe = true;
return this;
}
/**
* The query source to execute.
*/
@Required public CountRequest query(byte[] querySource) {
return query(querySource, 0, querySource.length);
}
/**
* The query source to execute.
*/
@Required public CountRequest query(byte[] querySource, int offset, int length) {
this.querySource = querySource;
this.querySourceOffset = offset;
this.querySourceLength = length;
this.querySourceUnsafe = false;
return this;
}
@ -214,8 +260,13 @@ public class CountRequest extends BroadcastOperationRequest {
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
minScore = in.readFloat();
querySource = new byte[in.readVInt()];
in.readFully(querySource());
querySourceUnsafe = false;
querySourceOffset = 0;
querySourceLength = in.readVInt();
querySource = new byte[querySourceLength];
in.readFully(querySource);
if (in.readBoolean()) {
queryParserName = in.readUTF();
}
@ -231,14 +282,10 @@ public class CountRequest extends BroadcastOperationRequest {
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeFloat(minScore);
if (querySource != null) {
out.writeVInt(querySource.length);
out.writeBytes(querySource);
} else {
FastByteArrayOutputStream os = queryBuilder.buildAsUnsafeBytes(contentType);
out.writeVInt(os.size());
out.writeBytes(os.unsafeByteArray(), 0, os.size());
}
out.writeVInt(querySourceLength);
out.writeBytes(querySource, querySourceOffset, querySourceLength);
if (queryParserName == null) {
out.writeBoolean(false);
} else {

View File

@ -35,7 +35,11 @@ import java.io.IOException;
class ShardCountRequest extends BroadcastShardOperationRequest {
private float minScore;
private byte[] querySource;
private int querySourceOffset;
private int querySourceLength;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String queryParserName;
@ -47,6 +51,8 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
super(index, shardId);
this.minScore = request.minScore();
this.querySource = request.querySource();
this.querySourceOffset = request.querySourceOffset();
this.querySourceLength = request.querySourceLength();
this.queryParserName = request.queryParserName();
this.types = request.types();
}
@ -59,6 +65,14 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
return querySource;
}
public int querySourceOffset() {
return querySourceOffset;
}
public int querySourceLength() {
return querySourceLength;
}
public String queryParserName() {
return queryParserName;
}

View File

@ -103,7 +103,8 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
@Override protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
long count = indexShard.count(request.minScore(), request.querySource(), request.queryParserName(), request.types());
long count = indexShard.count(request.minScore(), request.querySource(), request.querySourceOffset(), request.querySourceLength(),
request.queryParserName(), request.types());
return new ShardCountResponse(request.index(), request.shardId(), count);
}
}

View File

@ -103,6 +103,10 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
return operationThreading(BroadcastOperationThreading.fromString(operationThreading, this.operationThreading));
}
protected void beforeLocalFork() {
}
@Override public void writeTo(StreamOutput out) throws IOException {
if (indices == null) {
out.writeVInt(0);

View File

@ -163,6 +163,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
// we have local operations, perform them now
if (localOperations > 0) {
request.beforeLocalFork();
if (request.operationThreading() == BroadcastOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
@Override public void run() {

View File

@ -32,6 +32,8 @@ public interface IndexQueryParser extends IndexComponent {
Query parse(byte[] source) throws ElasticSearchException;
Query parse(byte[] source, int offset, int length) throws ElasticSearchException;
Query parse(String source) throws ElasticSearchException;
Query parse(QueryBuilder queryBuilder) throws ElasticSearchException;

View File

@ -138,9 +138,13 @@ public class XContentIndexQueryParser extends AbstractIndexComponent implements
}
@Override public Query parse(byte[] source) throws ElasticSearchException {
return parse(source, 0, source.length);
}
@Override public Query parse(byte[] source, int offset, int length) throws ElasticSearchException {
XContentParser parser = null;
try {
parser = XContentFactory.xContent(source).createParser(source);
parser = XContentFactory.xContent(source, offset, length).createParser(source, offset, length);
return parse(cache.get().get(), parser);
} catch (QueryParsingException e) {
throw e;

View File

@ -62,6 +62,8 @@ public interface IndexShard extends IndexShardComponent, CloseableComponent {
long count(float minScore, byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
long count(float minScore, byte[] querySource, int querySourceOffset, int querySourceLength, @Nullable String queryParserName, String... types) throws ElasticSearchException;
void refresh(Engine.Refresh refresh) throws ElasticSearchException;
void flush(Engine.Flush flush) throws ElasticSearchException;

View File

@ -113,7 +113,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return translog;
}
public ShardRouting routingEntry() {
@Override public ShardRouting routingEntry() {
return this.shardRouting;
}
@ -193,19 +193,19 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return this;
}
public IndexShardState state() {
@Override public IndexShardState state() {
return state;
}
/**
* Returns the estimated flushable memory size. Returns <tt>null</tt> if not available.
*/
public SizeValue estimateFlushableMemorySize() throws ElasticSearchException {
@Override public SizeValue estimateFlushableMemorySize() throws ElasticSearchException {
writeAllowed();
return engine.estimateFlushableMemorySize();
}
public ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException {
@Override public ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException {
writeAllowed();
return innerCreate(type, id, source);
}
@ -223,7 +223,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return doc;
}
public ParsedDocument index(String type, String id, byte[] source) throws ElasticSearchException {
@Override public ParsedDocument index(String type, String id, byte[] source) throws ElasticSearchException {
writeAllowed();
return innerIndex(type, id, source);
}
@ -241,7 +241,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return doc;
}
public void delete(String type, String id) {
@Override public void delete(String type, String id) {
writeAllowed();
DocumentMapper docMapper = mapperService.type(type);
if (docMapper == null) {
@ -250,7 +250,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
innerDelete(docMapper.uidMapper().term(type, id));
}
public void delete(Term uid) {
@Override public void delete(Term uid) {
writeAllowed();
innerDelete(uid);
}
@ -262,7 +262,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.delete(new Engine.Delete(uid));
}
public void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
@Override public void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
writeAllowed();
if (types == null) {
types = Strings.EMPTY_ARRAY;
@ -288,7 +288,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.delete(new Engine.DeleteByQuery(query, querySource, queryParserName, types));
}
public byte[] get(String type, String id) throws ElasticSearchException {
@Override public byte[] get(String type, String id) throws ElasticSearchException {
readAllowed();
DocumentMapper docMapper = mapperService.type(type);
if (docMapper == null) {
@ -315,7 +315,12 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
}
public long count(float minScore, byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
@Override public long count(float minScore, byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
return count(minScore, querySource, 0, querySource.length, queryParserName, types);
}
@Override public long count(float minScore, byte[] querySource, int querySourceOffset, int querySourceLength,
@Nullable String queryParserName, String... types) throws ElasticSearchException {
readAllowed();
IndexQueryParser queryParser = queryParserService.defaultIndexQueryParser();
if (queryParserName != null) {
@ -343,7 +348,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
}
public void refresh(Engine.Refresh refresh) throws ElasticSearchException {
@Override public void refresh(Engine.Refresh refresh) throws ElasticSearchException {
writeAllowed();
if (logger.isTraceEnabled()) {
logger.trace("Refresh with {}", refresh);
@ -351,7 +356,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.refresh(refresh);
}
public void flush(Engine.Flush flush) throws ElasticSearchException {
@Override public void flush(Engine.Flush flush) throws ElasticSearchException {
writeAllowed();
if (logger.isTraceEnabled()) {
logger.trace("Flush");
@ -367,22 +372,22 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.optimize(optimize);
}
public <T> T snapshot(Engine.SnapshotHandler<T> snapshotHandler) throws EngineException {
@Override public <T> T snapshot(Engine.SnapshotHandler<T> snapshotHandler) throws EngineException {
readAllowed();
return engine.snapshot(snapshotHandler);
}
public void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException {
@Override public void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException {
writeAllowed();
engine.recover(recoveryHandler);
}
public Engine.Searcher searcher() {
@Override public Engine.Searcher searcher() {
readAllowed();
return engine.searcher();
}
public void close() {
@Override public void close() {
synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
if (refreshScheduledFuture != null) {