Fix InternalSearchHits serialization to be deterministic

The assertion on binary equality for streamable serialization
sometimes fails due to the usage of identify hashmaps inside
the InternalSearchHits serialization. This only happens if
the number of shards the result set is composed of is very high.
This commit makes the serialziation deterministic and removes
the need to serialize the ordinal due to in-order serialization.
This commit is contained in:
Simon Willnauer 2014-07-22 12:03:13 +02:00
parent 924f91588b
commit 8db7b2b20b
3 changed files with 30 additions and 19 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.internal;
import com.carrotsearch.hppc.IntObjectOpenHashMap; import com.carrotsearch.hppc.IntObjectOpenHashMap;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -30,9 +31,7 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import java.io.IOException; import java.io.IOException;
import java.util.IdentityHashMap; import java.util.*;
import java.util.Iterator;
import java.util.Map;
import static org.elasticsearch.search.SearchShardTarget.readSearchShardTarget; import static org.elasticsearch.search.SearchShardTarget.readSearchShardTarget;
import static org.elasticsearch.search.internal.InternalSearchHit.readSearchHit; import static org.elasticsearch.search.internal.InternalSearchHit.readSearchHit;
@ -212,9 +211,16 @@ public class InternalSearchHits implements SearchHits {
} else { } else {
if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) { if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
// read the lookup table first // read the lookup table first
int lookupSize = in.readVInt(); final IntObjectOpenHashMap<SearchShardTarget> handleShardLookup = context.handleShardLookup();
final int lookupSize = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
for (int i = 0; i < lookupSize; i++) { for (int i = 0; i < lookupSize; i++) {
context.handleShardLookup().put(in.readVInt(), readSearchShardTarget(in)); handleShardLookup.put(i + 1, readSearchShardTarget(in));
}
} else {
for (int i = 0; i < lookupSize; i++) {
handleShardLookup.put(in.readVInt(), readSearchShardTarget(in));
}
} }
} }
@ -238,19 +244,26 @@ public class InternalSearchHits implements SearchHits {
if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) { if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
// start from 1, 0 is for null! // start from 1, 0 is for null!
int counter = 1; int counter = 1;
List<SearchShardTarget> targets = new ArrayList<>();
for (InternalSearchHit hit : hits) { for (InternalSearchHit hit : hits) {
if (hit.shard() != null) { if (hit.shard() != null) {
Integer handle = context.shardHandleLookup().get(hit.shard()); Integer handle = context.shardHandleLookup().get(hit.shard());
if (handle == null) { if (handle == null) {
context.shardHandleLookup().put(hit.shard(), counter++); context.shardHandleLookup().put(hit.shard(), counter++);
targets.add(hit.shard());
} }
} }
} }
out.writeVInt(context.shardHandleLookup().size()); out.writeVInt(targets.size());
if (!context.shardHandleLookup().isEmpty()) { if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
for (Map.Entry<SearchShardTarget, Integer> entry : context.shardHandleLookup().entrySet()) { for (int i = 0; i < targets.size(); i++) {
out.writeVInt(entry.getValue()); // the ordinal is implicit here since we write it in-order
entry.getKey().writeTo(out); targets.get(i).writeTo(out);
}
} else {
for (int i = 0; i < targets.size(); i++) {
out.writeVInt(i+1);
targets.get(i).writeTo(out);
} }
} }
} }

View File

@ -20,9 +20,7 @@
package org.elasticsearch.search.sort; package org.elasticsearch.search.sort;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.UnicodeUtil; import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
@ -63,24 +61,24 @@ import static org.hamcrest.Matchers.*;
public class SimpleSortTests extends ElasticsearchIntegrationTest { public class SimpleSortTests extends ElasticsearchIntegrationTest {
@LuceneTestCase.AwaitsFix(bugUrl = "simon is working on this") @Test
public void testIssue6614() throws ExecutionException, InterruptedException { public void testIssue6614() throws ExecutionException, InterruptedException {
List<IndexRequestBuilder> builders = new ArrayList<>(); List<IndexRequestBuilder> builders = new ArrayList<>();
boolean strictTimeBasedIndices = randomBoolean(); boolean strictTimeBasedIndices = randomBoolean();
final int numIndices = randomIntBetween(2, 25); // at most 25 days in the month final int numIndices = scaledRandomIntBetween(2, 25); // at most 25 days in the month
for (int i = 0; i < numIndices; i++) { for (int i = 0; i < numIndices; i++) {
final String indexId = strictTimeBasedIndices ? "idx_" + i : "idx"; final String indexId = strictTimeBasedIndices ? "idx_" + i : "idx";
if (strictTimeBasedIndices || i == 0) { if (strictTimeBasedIndices || i == 0) {
createIndex(indexId); createIndex(indexId);
} }
final int numDocs = randomIntBetween(1, 23); // hour of the day final int numDocs = scaledRandomIntBetween(1, 23); // hour of the day
for (int j = 0; j < numDocs; j++) { for (int j = 0; j < numDocs; j++) {
builders.add(client().prepareIndex(indexId, "type").setSource("foo", "bar", "timeUpdated", "2014/07/" + String.format(Locale.ROOT, "%02d", i+1)+" " + String.format(Locale.ROOT, "%02d", j+1) + ":00:00")); builders.add(client().prepareIndex(indexId, "type").setSource("foo", "bar", "timeUpdated", "2014/07/" + String.format(Locale.ROOT, "%02d", i+1)+" " + String.format(Locale.ROOT, "%02d", j+1) + ":00:00"));
} }
} }
int docs = builders.size(); int docs = builders.size();
indexRandom(true, builders); indexRandom(true, builders);
ensureYellow(); ensureGreen();
SearchResponse allDocsResponse = client().prepareSearch().setQuery(QueryBuilders.filteredQuery(matchAllQuery(), SearchResponse allDocsResponse = client().prepareSearch().setQuery(QueryBuilders.filteredQuery(matchAllQuery(),
FilterBuilders.boolFilter().must(FilterBuilders.termFilter("foo", "bar"), FilterBuilders.boolFilter().must(FilterBuilders.termFilter("foo", "bar"),
FilterBuilders.rangeFilter("timeUpdated").gte("2014/0" + randomIntBetween(1, 7) + "/01").cache(randomBoolean())))) FilterBuilders.rangeFilter("timeUpdated").gte("2014/0" + randomIntBetween(1, 7) + "/01").cache(randomBoolean()))))

View File

@ -494,8 +494,8 @@ public class ElasticsearchAssertions {
StreamInput input = new BytesStreamInput(orig); StreamInput input = new BytesStreamInput(orig);
input.setVersion(version); input.setVersion(version);
newInstance.readFrom(input); newInstance.readFrom(input);
assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(), equalTo(0)); assertThat("Stream should be fully read with version [" + version + "] for streamable [" + newInstance + "]", input.available(), equalTo(0));
assertThat("Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + "]", serialize(version, streamable), equalTo(orig)); assertThat("Serialization failed with version [" + version + "] bytes should be equal for streamable [" + streamable + "]", serialize(version, newInstance), equalTo(orig));
} catch (Throwable ex) { } catch (Throwable ex) {
throw new RuntimeException("failed to check serialization - version [" + version + "] for streamable [" + streamable + "]", ex); throw new RuntimeException("failed to check serialization - version [" + version + "] for streamable [" + streamable + "]", ex);
} }