move query hint only to broadcast requests that use it

This commit is contained in:
kimchy 2010-08-26 14:31:54 +03:00
parent dfa24f6d03
commit d56b4d266e
9 changed files with 73 additions and 26 deletions

View File

@ -24,18 +24,22 @@ import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class BroadcastPingRequest extends BroadcastOperationRequest { public class BroadcastPingRequest extends BroadcastOperationRequest {
@Nullable protected String queryHint;
BroadcastPingRequest() { BroadcastPingRequest() {
} }
public BroadcastPingRequest(String... indices) { public BroadcastPingRequest(String... indices) {
super(indices, null); super(indices);
this.queryHint = null;
} }
@Override public BroadcastPingRequest operationThreading(BroadcastOperationThreading operationThreading) { @Override public BroadcastPingRequest operationThreading(BroadcastOperationThreading operationThreading) {
@ -48,6 +52,10 @@ public class BroadcastPingRequest extends BroadcastOperationRequest {
return this; return this;
} }
public String queryHint() {
return this.queryHint;
}
public BroadcastPingRequest queryHint(String queryHint) { public BroadcastPingRequest queryHint(String queryHint) {
this.queryHint = queryHint; this.queryHint = queryHint;
return this; return this;
@ -55,9 +63,18 @@ public class BroadcastPingRequest extends BroadcastOperationRequest {
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
if (in.readBoolean()) {
queryHint = in.readUTF();
}
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
if (queryHint == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(queryHint);
}
} }
} }

View File

@ -37,7 +37,7 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest {
} }
public ClearIndicesCacheRequest(String... indices) { public ClearIndicesCacheRequest(String... indices) {
super(indices, null); super(indices);
// we want to do the refresh in parallel on local shards... // we want to do the refresh in parallel on local shards...
operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD); operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD);
} }

View File

@ -53,7 +53,7 @@ public class FlushRequest extends BroadcastOperationRequest {
* be flushed. * be flushed.
*/ */
public FlushRequest(String... indices) { public FlushRequest(String... indices) {
super(indices, null); super(indices);
// we want to do the refresh in parallel on local shards... // we want to do the refresh in parallel on local shards...
operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD); operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD);
} }

View File

@ -59,7 +59,7 @@ public class OptimizeRequest extends BroadcastOperationRequest {
* @param indices The indices to optimize, no indices passed means all indices will be optimized. * @param indices The indices to optimize, no indices passed means all indices will be optimized.
*/ */
public OptimizeRequest(String... indices) { public OptimizeRequest(String... indices) {
super(indices, null); super(indices);
// we want to do the optimize in parallel on local shards... // we want to do the optimize in parallel on local shards...
operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD); operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD);
} }

View File

@ -44,7 +44,7 @@ public class RefreshRequest extends BroadcastOperationRequest {
} }
public RefreshRequest(String... indices) { public RefreshRequest(String... indices) {
super(indices, null); super(indices);
// we want to do the refresh in parallel on local shards... // we want to do the refresh in parallel on local shards...
operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD); operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD);
} }

View File

@ -33,7 +33,7 @@ public class IndicesStatusRequest extends BroadcastOperationRequest {
} }
public IndicesStatusRequest(String... indices) { public IndicesStatusRequest(String... indices) {
super(indices, null); super(indices);
} }
@Override public IndicesStatusRequest listenerThreaded(boolean listenerThreaded) { @Override public IndicesStatusRequest listenerThreaded(boolean listenerThreaded) {

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
import org.elasticsearch.common.xcontent.builder.XContentBuilder; import org.elasticsearch.common.xcontent.builder.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
@ -62,6 +63,8 @@ public class CountRequest extends BroadcastOperationRequest {
private float minScore = DEFAULT_MIN_SCORE; private float minScore = DEFAULT_MIN_SCORE;
@Nullable protected String queryHint;
private byte[] querySource; private byte[] querySource;
private int querySourceOffset; private int querySourceOffset;
private int querySourceLength; private int querySourceLength;
@ -78,7 +81,8 @@ public class CountRequest extends BroadcastOperationRequest {
* run against all indices. * run against all indices.
*/ */
public CountRequest(String... indices) { public CountRequest(String... indices) {
super(indices, null); super(indices);
this.queryHint = null;
} }
@Override public ActionRequestValidationException validate() { @Override public ActionRequestValidationException validate() {
@ -89,6 +93,10 @@ public class CountRequest extends BroadcastOperationRequest {
return validationException; return validationException;
} }
public String queryHint() {
return queryHint;
}
/** /**
* Controls the operation threading model. * Controls the operation threading model.
*/ */
@ -261,6 +269,10 @@ public class CountRequest extends BroadcastOperationRequest {
super.readFrom(in); super.readFrom(in);
minScore = in.readFloat(); minScore = in.readFloat();
if (in.readBoolean()) {
queryHint = in.readUTF();
}
querySourceUnsafe = false; querySourceUnsafe = false;
querySourceOffset = 0; querySourceOffset = 0;
querySourceLength = in.readVInt(); querySourceLength = in.readVInt();
@ -283,6 +295,13 @@ public class CountRequest extends BroadcastOperationRequest {
super.writeTo(out); super.writeTo(out);
out.writeFloat(minScore); out.writeFloat(minScore);
if (queryHint == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(queryHint);
}
out.writeVInt(querySourceLength); out.writeVInt(querySourceLength);
out.writeBytes(querySource, querySourceOffset, querySourceLength); out.writeBytes(querySource, querySourceOffset, querySourceLength);

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
/** /**
@ -35,7 +34,6 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
protected String[] indices; protected String[] indices;
@Nullable protected String queryHint;
private boolean listenerThreaded = false; private boolean listenerThreaded = false;
private BroadcastOperationThreading operationThreading = BroadcastOperationThreading.SINGLE_THREAD; private BroadcastOperationThreading operationThreading = BroadcastOperationThreading.SINGLE_THREAD;
@ -44,9 +42,8 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
} }
protected BroadcastOperationRequest(String[] indices, @Nullable String queryHint) { protected BroadcastOperationRequest(String[] indices) {
this.indices = indices; this.indices = indices;
this.queryHint = queryHint;
} }
public String[] indices() { public String[] indices() {
@ -58,10 +55,6 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
return this; return this;
} }
public String queryHint() {
return queryHint;
}
@Override public ActionRequestValidationException validate() { @Override public ActionRequestValidationException validate() {
return null; return null;
} }
@ -116,12 +109,6 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
out.writeUTF(index); out.writeUTF(index);
} }
} }
if (queryHint == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(queryHint);
}
out.writeByte(operationThreading.id()); out.writeByte(operationThreading.id());
} }
@ -135,9 +122,6 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
indices[i] = in.readUTF(); indices[i] = in.readUTF();
} }
} }
if (in.readBoolean()) {
queryHint = in.readUTF();
}
operationThreading = BroadcastOperationThreading.fromId(in.readByte()); operationThreading = BroadcastOperationThreading.fromId(in.readByte());
} }
} }

View File

@ -127,7 +127,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
} }
/** /**
* All the shards (replicas) for the provided indices grouped (each group is a single element, consisting * All the shards (primary + replicas) for the provided indices grouped (each group is a single element, consisting
* of the shard). This is handy for components that expect to get group iterators, but still want in some * of the shard). This is handy for components that expect to get group iterators, but still want in some
* cases to iterate over all the shards (and not just one shard in replication group). * cases to iterate over all the shards (and not just one shard in replication group).
* *
@ -155,6 +155,33 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return its; return its;
} }
/**
* All the primary shards for the provided indices grouped (each group is a single element, consisting
* of the primary shard). This is handy for components that expect to get group iterators, but still want in some
* cases to iterate over all primary shards (and not just one shard in replication group).
*
* @param indices The indices to return all the shards (replicas), can be <tt>null</tt> or empty array to indicate all indices
* @return All the primary shards grouped into a single shard element group each
* @throws IndexMissingException If an index passed does not exists
* @see IndexRoutingTable#groupByAllIt()
*/
public GroupShardsIterator primaryShardsGrouped(String... indices) throws IndexMissingException {
GroupShardsIterator its = new GroupShardsIterator();
if (indices == null || indices.length == 0) {
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
}
for (String index : indices) {
IndexRoutingTable indexRoutingTable = index(index);
if (indexRoutingTable == null) {
throw new IndexMissingException(new Index(index));
}
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
its.add(indexShardRoutingTable.primaryShard().shardsIt());
}
}
return its;
}
public static Builder newRoutingTableBuilder() { public static Builder newRoutingTableBuilder() {
return new Builder(); return new Builder();
} }