Merge branch 'master' into feature/multi_cluster_search

This commit is contained in:
Simon Willnauer 2017-01-11 12:41:26 +01:00
commit 2aae409508
15 changed files with 42 additions and 82 deletions

View File

@ -297,7 +297,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
private void raiseEarlyFailure(Exception e) {
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try {
Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().nodeId());
Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId());
sendReleaseSearchContext(entry.value.id(), connection);
} catch (Exception inner) {
inner.addSuppressed(e);
@ -322,7 +322,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
if (queryResult.hasHits()
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try {
Transport.Connection connection = nodeIdToConnection.apply(entry.value.queryResult().shardTarget().nodeId());
Transport.Connection connection = nodeIdToConnection.apply(entry.value.queryResult().shardTarget().getNodeId());
sendReleaseSearchContext(entry.value.queryResult().id(), connection);
} catch (Exception e) {
logger.trace("failed to release context", e);

View File

@ -76,7 +76,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().nodeId());
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, connection, querySearchRequest);
}

View File

@ -85,7 +85,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().nodeId());
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, connection);
}
@ -156,7 +156,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().nodeId());
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection);
}

View File

@ -71,9 +71,9 @@ import java.util.stream.StreamSupport;
public class SearchPhaseController extends AbstractComponent {
private static final Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = (o1, o2) -> {
int i = o1.value.shardTarget().index().compareTo(o2.value.shardTarget().index());
int i = o1.value.shardTarget().getIndex().compareTo(o2.value.shardTarget().getIndex());
if (i == 0) {
i = o1.value.shardTarget().shardId().id() - o2.value.shardTarget().shardId().id();
i = o1.value.shardTarget().getShardId().id() - o2.value.shardTarget().getShardId().id();
}
return i;
};

View File

@ -91,7 +91,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().nodeId());
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection);
}

View File

@ -185,7 +185,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
final QuerySearchResult querySearchResult = queryResults.get(entry.index);
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().getNodeId());
searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {

View File

@ -93,7 +93,7 @@ public class ShardSearchFailure implements ShardOperationFailedException {
@Override
public String index() {
if (shardTarget != null) {
return shardTarget.index();
return shardTarget.getIndex();
}
return null;
}
@ -104,7 +104,7 @@ public class ShardSearchFailure implements ShardOperationFailedException {
@Override
public int shardId() {
if (shardTarget != null) {
return shardTarget.shardId().id();
return shardTarget.getShardId().id();
}
return -1;
}
@ -156,7 +156,7 @@ public class ShardSearchFailure implements ShardOperationFailedException {
builder.field("shard", shardId());
builder.field("index", index());
if (shardTarget != null) {
builder.field("node", shardTarget.nodeId());
builder.field("node", shardTarget.getNodeId());
}
if (cause != null) {
builder.field("reason");

View File

@ -21,11 +21,9 @@ package org.elasticsearch.action.search;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.RAMOutputStream;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import java.io.IOException;
import java.util.Base64;
@ -53,7 +51,7 @@ final class TransportSearchHelper {
for (AtomicArray.Entry<? extends SearchPhaseResult> entry : searchPhaseResults.asList()) {
SearchPhaseResult searchPhaseResult = entry.value;
out.writeLong(searchPhaseResult.id());
out.writeString(searchPhaseResult.shardTarget().nodeId());
out.writeString(searchPhaseResult.shardTarget().getNodeId());
}
byte[] bytes = new byte[(int) out.getFilePointer()];
out.writeTo(bytes, 0);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.search;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.Index;
@ -35,21 +34,20 @@ import java.io.IOException;
*/
public class SearchShardTarget implements Writeable, Comparable<SearchShardTarget> {
private Text nodeId;
private Text index;
private ShardId shardId;
private final Text nodeId;
private final ShardId shardId;
public SearchShardTarget(StreamInput in) throws IOException {
if (in.readBoolean()) {
nodeId = in.readText();
} else {
nodeId = null;
}
shardId = ShardId.readShardId(in);
index = new Text(shardId.getIndexName());
}
public SearchShardTarget(String nodeId, ShardId shardId) {
this.nodeId = nodeId == null ? null : new Text(nodeId);
this.index = new Text(shardId.getIndexName());
this.shardId = shardId;
}
@ -58,33 +56,16 @@ public class SearchShardTarget implements Writeable, Comparable<SearchShardTarge
}
@Nullable
public String nodeId() {
public String getNodeId() {
return nodeId.string();
}
@Nullable
public String getNodeId() {
return nodeId();
}
public Text nodeIdText() {
public Text getNodeIdText() {
return this.nodeId;
}
public String index() {
return index.string();
}
public String getIndex() {
return index();
}
public Text indexText() {
return this.index;
}
public ShardId shardId() {
return shardId;
return shardId.getIndexName();
}
public ShardId getShardId() {
@ -93,7 +74,7 @@ public class SearchShardTarget implements Writeable, Comparable<SearchShardTarge
@Override
public int compareTo(SearchShardTarget o) {
int i = index.string().compareTo(o.index());
int i = shardId.getIndexName().compareTo(o.getIndex());
if (i == 0) {
i = shardId.getId() - o.shardId.id();
}
@ -125,7 +106,7 @@ public class SearchShardTarget implements Writeable, Comparable<SearchShardTarge
@Override
public int hashCode() {
int result = nodeId != null ? nodeId.hashCode() : 0;
result = 31 * result + (index != null ? index.hashCode() : 0);
result = 31 * result + (shardId.getIndexName() != null ? shardId.getIndexName().hashCode() : 0);
result = 31 * result + shardId.hashCode();
return result;
}

View File

@ -99,7 +99,7 @@ public class InternalSearchHit implements SearchHit {
@Nullable
private SearchShardTarget shard;
private transient Text index;
private transient String index;
private Map<String, Object> sourceAsMap;
private byte[] sourceAsBytes;
@ -134,15 +134,6 @@ public class InternalSearchHit implements SearchHit {
return this.docId;
}
public void shardTarget(SearchShardTarget shardTarget) {
shard(shardTarget);
if (innerHits != null) {
for (InternalSearchHits searchHits : innerHits.values()) {
searchHits.shardTarget(shardTarget);
}
}
}
public void score(float score) {
this.score = score;
}
@ -173,7 +164,7 @@ public class InternalSearchHit implements SearchHit {
@Override
public String index() {
return this.index == null ? null : this.index.string();
return this.index;
}
@Override
@ -238,14 +229,6 @@ public class InternalSearchHit implements SearchHit {
return sourceRef();
}
/**
* Internal source representation, might be compressed....
*/
public BytesReference internalSourceRef() {
return source;
}
@Override
public byte[] source() {
if (source == null) {
@ -327,10 +310,6 @@ public class InternalSearchHit implements SearchHit {
this.fields = fields;
}
public Map<String, HighlightField> internalHighlightFields() {
return highlightFields;
}
@Override
public Map<String, HighlightField> highlightFields() {
return highlightFields == null ? emptyMap() : highlightFields;
@ -390,7 +369,7 @@ public class InternalSearchHit implements SearchHit {
public void shard(SearchShardTarget target) {
this.shard = target;
if (target != null) {
this.index = target.indexText();
this.index = target.getIndex();
}
}
@ -457,8 +436,8 @@ public class InternalSearchHit implements SearchHit {
// For inner_hit hits shard is null and that is ok, because the parent search hit has all this information.
// Even if this was included in the inner_hit hits this would be the same, so better leave it out.
if (explanation() != null && shard != null) {
builder.field(Fields._SHARD, shard.shardId());
builder.field(Fields._NODE, shard.nodeIdText());
builder.field(Fields._SHARD, shard.getShardId());
builder.field(Fields._NODE, shard.getNodeIdText());
}
if (nestedIdentity != null) {
nestedIdentity.toXContent(builder, params);
@ -535,7 +514,7 @@ public class InternalSearchHit implements SearchHit {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
String currentFieldName = null;
String type = null, id = null;
Text index = null;
String index = null;
float score = DEFAULT_SCORE;
long version = -1;
SearchSortValues sortValues = SearchSortValues.EMPTY;
@ -555,7 +534,7 @@ public class InternalSearchHit implements SearchHit {
if (Fields._TYPE.equals(currentFieldName)) {
type = parser.text();
} else if (Fields._INDEX.equals(currentFieldName)) {
index = new Text(parser.text());
index = parser.text();
} else if (Fields._ID.equals(currentFieldName)) {
id = parser.text();
} else if (Fields._SCORE.equals(currentFieldName)) {

View File

@ -22,11 +22,13 @@ package org.elasticsearch.index.engine;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.RamUsageTester;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.test.ESTestCase;
public class LiveVersionMapTests extends ESTestCase {
public void testRamBytesUsed() throws Exception {
assumeTrue("Test disabled for JDK 9", JavaVersion.current().compareTo(JavaVersion.parse("9")) < 0);
LiveVersionMap map = new LiveVersionMap();
for (int i = 0; i < 100000; ++i) {
BytesRefBuilder uid = new BytesRefBuilder();

View File

@ -162,25 +162,25 @@ public class InternalSearchHitTests extends ESTestCase {
Map<String, InternalSearchHits> innerHits = new HashMap<>();
InternalSearchHit innerHit1 = new InternalSearchHit(0, "_id", new Text("_type"), null);
innerHit1.shardTarget(target);
innerHit1.shard(target);
InternalSearchHit innerInnerHit2 = new InternalSearchHit(0, "_id", new Text("_type"), null);
innerInnerHit2.shardTarget(target);
innerInnerHit2.shard(target);
innerHits.put("1", new InternalSearchHits(new InternalSearchHit[]{innerInnerHit2}, 1, 1f));
innerHit1.setInnerHits(innerHits);
InternalSearchHit innerHit2 = new InternalSearchHit(0, "_id", new Text("_type"), null);
innerHit2.shardTarget(target);
innerHit2.shard(target);
InternalSearchHit innerHit3 = new InternalSearchHit(0, "_id", new Text("_type"), null);
innerHit3.shardTarget(target);
innerHit3.shard(target);
innerHits = new HashMap<>();
InternalSearchHit hit1 = new InternalSearchHit(0, "_id", new Text("_type"), null);
innerHits.put("1", new InternalSearchHits(new InternalSearchHit[]{innerHit1, innerHit2}, 1, 1f));
innerHits.put("2", new InternalSearchHits(new InternalSearchHit[]{innerHit3}, 1, 1f));
hit1.shardTarget(target);
hit1.shard(target);
hit1.setInnerHits(innerHits);
InternalSearchHit hit2 = new InternalSearchHit(0, "_id", new Text("_type"), null);
hit2.shardTarget(target);
hit2.shard(target);
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[]{hit1, hit2}, 2, 1f);

View File

@ -90,9 +90,9 @@ public class SearchPreferenceIT extends ESIntegTestCase {
final Client client = internalCluster().smartClient();
SearchResponse searchResponse = client.prepareSearch("test").setQuery(matchAllQuery()).execute().actionGet();
String firstNodeId = searchResponse.getHits().getAt(0).shard().nodeId();
String firstNodeId = searchResponse.getHits().getAt(0).shard().getNodeId();
searchResponse = client.prepareSearch("test").setQuery(matchAllQuery()).execute().actionGet();
String secondNodeId = searchResponse.getHits().getAt(0).shard().nodeId();
String secondNodeId = searchResponse.getHits().getAt(0).shard().getNodeId();
assertThat(firstNodeId, not(equalTo(secondNodeId)));
}
@ -220,7 +220,7 @@ public class SearchPreferenceIT extends ESIntegTestCase {
for (int i = 0; i < 2; i++) {
SearchResponse searchResponse = request.execute().actionGet();
assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
hitNodes.add(searchResponse.getHits().getAt(0).shard().nodeId());
hitNodes.add(searchResponse.getHits().getAt(0).shard().getNodeId());
}
assertThat(hitNodes.size(), greaterThan(1));
}

View File

@ -248,7 +248,7 @@ In some environments, it may make more sense to prepare a custom image containin
FROM docker.elastic.co/elasticsearch/elasticsearch:{version}
ADD elasticsearch.yml /usr/share/elasticsearch/config/
USER root
chown elasticsearch:elasticsearch config/elasticsearch.yml
RUN chown elasticsearch:elasticsearch config/elasticsearch.yml
USER elasticsearch
--------------------------------------------

View File

@ -185,7 +185,7 @@ public class ClientScrollableHitSource extends ScrollableHitSource {
} else {
failures = new ArrayList<>(response.getShardFailures().length);
for (ShardSearchFailure failure: response.getShardFailures()) {
String nodeId = failure.shard() == null ? null : failure.shard().nodeId();
String nodeId = failure.shard() == null ? null : failure.shard().getNodeId();
failures.add(new SearchFailure(failure.getCause(), failure.index(), failure.shardId(), nodeId));
}
}