percolator: Make sure that start time is serialized on the mpercolate shard requests

Closes #15908
This commit is contained in:
Martijn van Groningen 2016-01-12 21:12:15 +01:00
parent 871b38afcb
commit a05ea535ad
4 changed files with 43 additions and 31 deletions

View File

@ -52,10 +52,6 @@ public class PercolateShardRequest extends BroadcastShardRequest {
this.startTime = request.startTime;
}
public PercolateShardRequest(ShardId shardId, OriginalIndices originalIndices) {
super(shardId, originalIndices);
}
PercolateShardRequest(ShardId shardId, PercolateRequest request) {
super(shardId, request);
this.documentType = request.documentType();

View File

@ -160,12 +160,8 @@ public class TransportShardMultiPercolateAction extends TransportSingleShardActi
items = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
int slot = in.readVInt();
OriginalIndices originalIndices = OriginalIndices.readOriginalIndices(in);
PercolateShardRequest shardRequest = new PercolateShardRequest(new ShardId(index, shardId), originalIndices);
shardRequest.documentType(in.readString());
shardRequest.source(in.readBytesReference());
shardRequest.docSource(in.readBytesReference());
shardRequest.onlyCount(in.readBoolean());
PercolateShardRequest shardRequest = new PercolateShardRequest();
shardRequest.readFrom(in);
Item item = new Item(slot, shardRequest);
items.add(item);
}
@ -179,11 +175,7 @@ public class TransportShardMultiPercolateAction extends TransportSingleShardActi
out.writeVInt(items.size());
for (Item item : items) {
out.writeVInt(item.slot);
OriginalIndices.writeOriginalIndices(item.request.originalIndices(), out);
out.writeString(item.request.documentType());
out.writeBytesReference(item.request.source());
out.writeBytesReference(item.request.docSource());
out.writeBoolean(item.request.onlyCount());
item.request.writeTo(out);
}
}

View File

@ -33,12 +33,14 @@ import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.smileBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.yamlBuilder;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
import static org.elasticsearch.percolator.PercolatorTestUtil.convertFromTextArray;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertMatchCount;
@ -363,6 +365,33 @@ public class MultiPercolatorIT extends ESIntegTestCase {
assertEquals(response.getItems()[1].getResponse().getMatches()[0].getId().string(), "Q");
}
public void testStartTimeIsPropagatedToShardRequests() throws Exception {
// See: https://github.com/elastic/elasticsearch/issues/15908
internalCluster().ensureAtLeastNumDataNodes(2);
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
)
.addMapping("type", "date_field", "type=date,format=strict_date_optional_time||epoch_millis")
.get();
ensureGreen();
client().prepareIndex("test", ".percolator", "1")
.setSource(jsonBuilder().startObject().field("query", rangeQuery("date_field").lt("now+90d")).endObject())
.setRefresh(true)
.get();
for (int i = 0; i < 32; i++) {
MultiPercolateResponse response = client().prepareMultiPercolate()
.add(client().preparePercolate().setDocumentType("type").setIndices("test")
.setPercolateDoc(new PercolateSourceBuilder.DocBuilder().setDoc("date_field", "2015-07-21T10:28:01-07:00")))
.get();
assertThat(response.getItems()[0].getResponse().getCount(), equalTo(1L));
assertThat(response.getItems()[0].getResponse().getMatches()[0].getId().string(), equalTo("1"));
}
}
void initNestedIndexAndPercolation() throws IOException {
XContentBuilder mapping = XContentFactory.jsonBuilder();
mapping.startObject().startObject("properties").startObject("companyname").field("type", "string").endObject()

View File

@ -66,14 +66,13 @@ import static org.hamcrest.Matchers.nullValue;
public class PercolateDocumentParserTests extends ESTestCase {
private Index index;
private MapperService mapperService;
private PercolateDocumentParser parser;
private QueryShardContext queryShardContext;
private PercolateShardRequest request;
@Before
public void init() {
index = new Index("_index");
IndexSettings indexSettings = new IndexSettings(new IndexMetaData.Builder("_index").settings(
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
@ -97,6 +96,10 @@ public class PercolateDocumentParserTests extends ESTestCase {
parser = new PercolateDocumentParser(
highlightPhase, new SortParseElement(), aggregationPhase, mappingUpdatedAction
);
request = Mockito.mock(PercolateShardRequest.class);
Mockito.when(request.shardId()).thenReturn(new ShardId(new Index("_index"), 0));
Mockito.when(request.documentType()).thenReturn("type");
}
public void testParseDoc() throws Exception {
@ -105,9 +108,7 @@ public class PercolateDocumentParserTests extends ESTestCase {
.field("field1", "value1")
.endObject()
.endObject();
PercolateShardRequest request = new PercolateShardRequest(new ShardId(index, 0), null);
request.documentType("type");
request.source(source.bytes());
Mockito.when(request.source()).thenReturn(source.bytes());
PercolateContext context = new PercolateContext(request, new SearchShardTarget("_node", "_index", 0), mapperService);
ParsedDocument parsedDocument = parser.parse(request, context, mapperService, queryShardContext);
@ -126,9 +127,7 @@ public class PercolateDocumentParserTests extends ESTestCase {
.field("size", 123)
.startObject("sort").startObject("_score").endObject().endObject()
.endObject();
PercolateShardRequest request = new PercolateShardRequest(new ShardId(index, 0), null);
request.documentType("type");
request.source(source.bytes());
Mockito.when(request.source()).thenReturn(source.bytes());
PercolateContext context = new PercolateContext(request, new SearchShardTarget("_node", "_index", 0), mapperService);
ParsedDocument parsedDocument = parser.parse(request, context, mapperService, queryShardContext);
@ -151,10 +150,8 @@ public class PercolateDocumentParserTests extends ESTestCase {
XContentBuilder docSource = jsonBuilder().startObject()
.field("field1", "value1")
.endObject();
PercolateShardRequest request = new PercolateShardRequest(new ShardId(index, 0), null);
request.documentType("type");
request.source(source.bytes());
request.docSource(docSource.bytes());
Mockito.when(request.source()).thenReturn(source.bytes());
Mockito.when(request.docSource()).thenReturn(docSource.bytes());
PercolateContext context = new PercolateContext(request, new SearchShardTarget("_node", "_index", 0), mapperService);
ParsedDocument parsedDocument = parser.parse(request, context, mapperService, queryShardContext);
@ -180,10 +177,8 @@ public class PercolateDocumentParserTests extends ESTestCase {
XContentBuilder docSource = jsonBuilder().startObject()
.field("field1", "value1")
.endObject();
PercolateShardRequest request = new PercolateShardRequest(new ShardId(index, 0), null);
request.documentType("type");
request.source(source.bytes());
request.docSource(docSource.bytes());
Mockito.when(request.source()).thenReturn(source.bytes());
Mockito.when(request.docSource()).thenReturn(docSource.bytes());
PercolateContext context = new PercolateContext(request, new SearchShardTarget("_node", "_index", 0), mapperService);
try {