Search: Remove query-cache serialization optimization.
The query-cache has an optimization to not deserialize the bytes at the shard level. However this is a bit fragile since it assumes that serialized streams can be concatenanted (which is not the case with shared strings) and also does not update the QueryResult object that is held by the SearchContext. So you need to make sure to use the right one. With this change, the query cache just deserializes bytes into the QueryResult object from the context. Close #9500
This commit is contained in:
parent
fb377d48bd
commit
00d54fabb2
|
@ -32,37 +32,26 @@ import org.apache.lucene.index.IndexReader;
|
|||
import org.apache.lucene.util.Accountable;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.PagedBytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.MemorySizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.search.internal.ShardSearchRequest;
|
||||
import org.elasticsearch.search.query.QueryPhase;
|
||||
import org.elasticsearch.search.query.QuerySearchResult;
|
||||
import org.elasticsearch.search.query.QuerySearchResultProvider;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
@ -217,11 +206,12 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe
|
|||
}
|
||||
|
||||
/**
|
||||
* Loads the cache result, computing it if needed by executing the query phase. The combination of load + compute allows
|
||||
* Loads the cache result, computing it if needed by executing the query phase and otherwise deserializing the cached
|
||||
* value into the {@link SearchContext#queryResult() context's query result}. The combination of load + compute allows
|
||||
* to have a single load operation that will cause other requests with the same key to wait till its loaded an reuse
|
||||
* the same cache.
|
||||
*/
|
||||
public QuerySearchResultProvider load(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
|
||||
public void loadIntoContext(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
|
||||
assert canCache(request, context);
|
||||
Key key = buildKey(request, context);
|
||||
Loader loader = new Loader(queryPhase, context, key);
|
||||
|
@ -238,10 +228,11 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe
|
|||
}
|
||||
} else {
|
||||
key.shard.queryCache().onHit();
|
||||
// restore the cached query result into the context
|
||||
final QuerySearchResult result = context.queryResult();
|
||||
result.readFromWithId(context.id(), value.reference.streamInput());
|
||||
result.shardTarget(context.shardTarget());
|
||||
}
|
||||
|
||||
// try and be smart, and reuse an already loaded and constructed QueryResult of in VM execution
|
||||
return new BytesQuerySearchResult(context.id(), context.shardTarget(), value.reference, loader.isLoaded() ? context.queryResult() : null);
|
||||
}
|
||||
|
||||
private static class Loader implements Callable<Value> {
|
||||
|
@ -278,7 +269,6 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe
|
|||
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
|
||||
// the memory properly paged instead of having varied sized bytes
|
||||
final BytesReference reference = out.bytes();
|
||||
assert verifyCacheSerializationSameAsQueryResult(reference, context, context.queryResult());
|
||||
loaded = true;
|
||||
Value value = new Value(reference, out.ramBytesUsed());
|
||||
key.shard.queryCache().onCached(key, value);
|
||||
|
@ -459,14 +449,6 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean verifyCacheSerializationSameAsQueryResult(BytesReference cacheData, SearchContext context, QuerySearchResult result) throws Exception {
|
||||
BytesStreamOutput out1 = new BytesStreamOutput();
|
||||
new BytesQuerySearchResult(context.id(), context.shardTarget(), cacheData).writeTo(out1);
|
||||
BytesStreamOutput out2 = new BytesStreamOutput();
|
||||
result.writeTo(out2);
|
||||
return out1.bytes().equals(out2.bytes());
|
||||
}
|
||||
|
||||
private static Key buildKey(ShardSearchRequest request, SearchContext context) throws Exception {
|
||||
// TODO: for now, this will create different keys for different JSON order
|
||||
// TODO: tricky to get around this, need to parse and order all, which can be expensive
|
||||
|
@ -474,74 +456,4 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe
|
|||
((DirectoryReader) context.searcher().getIndexReader()).getVersion(),
|
||||
request.cacheKey());
|
||||
}
|
||||
|
||||
/**
|
||||
* this class aim is to just provide an on the wire *write* format that is the same as {@link QuerySearchResult}
|
||||
* and also provide a nice wrapper for in node communication for an already constructed {@link QuerySearchResult}.
|
||||
*/
|
||||
private static class BytesQuerySearchResult extends QuerySearchResultProvider {
|
||||
|
||||
private long id;
|
||||
private SearchShardTarget shardTarget;
|
||||
private BytesReference data;
|
||||
|
||||
private transient QuerySearchResult result;
|
||||
|
||||
private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data) {
|
||||
this(id, shardTarget, data, null);
|
||||
}
|
||||
|
||||
private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data, QuerySearchResult result) {
|
||||
this.id = id;
|
||||
this.shardTarget = shardTarget;
|
||||
this.data = data;
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean includeFetch() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QuerySearchResult queryResult() {
|
||||
if (result == null) {
|
||||
result = new QuerySearchResult(id, shardTarget);
|
||||
try {
|
||||
result.readFromWithId(id, data.streamInput());
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchParseException("failed to parse a cached query", e);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long id() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchShardTarget shardTarget() {
|
||||
return shardTarget;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shardTarget(SearchShardTarget shardTarget) {
|
||||
this.shardTarget = shardTarget;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
throw new ElasticsearchIllegalStateException("readFrom should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeLong(id);
|
||||
// shardTarget.writeTo(out); not needed
|
||||
data.writeTo(out); // we need to write teh bytes as is, to be the same as QuerySearchResult
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.carrotsearch.hppc.ObjectSet;
|
|||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import org.apache.lucene.index.IndexOptions;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.NumericDocValues;
|
||||
|
@ -271,6 +272,19 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to load the query results from the cache or execute the query phase directly if the cache cannot be used.
|
||||
*/
|
||||
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context,
|
||||
final QueryPhase queryPhase) throws Exception {
|
||||
final boolean canCache = indicesQueryCache.canCache(request, context);
|
||||
if (canCache) {
|
||||
indicesQueryCache.loadIntoContext(request, context, queryPhase);
|
||||
} else {
|
||||
queryPhase.execute(context);
|
||||
}
|
||||
}
|
||||
|
||||
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
|
||||
final SearchContext context = createAndPutContext(request);
|
||||
try {
|
||||
|
@ -278,14 +292,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
long time = System.nanoTime();
|
||||
contextProcessing(context);
|
||||
|
||||
QuerySearchResultProvider result;
|
||||
boolean canCache = indicesQueryCache.canCache(request, context);
|
||||
if (canCache) {
|
||||
result = indicesQueryCache.load(request, context, queryPhase);
|
||||
} else {
|
||||
queryPhase.execute(context);
|
||||
result = context.queryResult();
|
||||
}
|
||||
loadOrExecuteQueryPhase(request, context, queryPhase);
|
||||
|
||||
if (context.searchType() == SearchType.COUNT) {
|
||||
freeContext(context.id());
|
||||
|
@ -294,7 +301,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
}
|
||||
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
|
||||
|
||||
return result;
|
||||
return context.queryResult();
|
||||
} catch (Throwable e) {
|
||||
// execution exception can happen while loading the cache, strip it
|
||||
if (e instanceof ExecutionException) {
|
||||
|
@ -989,11 +996,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
if (canCache != top) {
|
||||
return;
|
||||
}
|
||||
if (canCache) {
|
||||
indicesQueryCache.load(request, context, queryPhase);
|
||||
} else {
|
||||
queryPhase.execute(context);
|
||||
}
|
||||
loadOrExecuteQueryPhase(request, context, queryPhase);
|
||||
long took = System.nanoTime() - now;
|
||||
if (indexShard.warmerService().logger().isTraceEnabled()) {
|
||||
indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took));
|
||||
|
|
75
src/test/java/org/elasticsearch/indices/cache/query/IndicesQueryCacheTests.java
vendored
Normal file
75
src/test/java/org/elasticsearch/indices/cache/query/IndicesQueryCacheTests.java
vendored
Normal file
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.cache.query;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class IndicesQueryCacheTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
// One of the primary purposes of the query cache is to cache aggs results
|
||||
public void testCacheAggs() throws Exception {
|
||||
assertAcked(client().admin().indices().prepareCreate("index")
|
||||
.addMapping("type", "f", "type=date")
|
||||
.setSettings(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, true).get());
|
||||
indexRandom(true,
|
||||
client().prepareIndex("index", "type").setSource("f", "2014-03-10T00:00:00.000Z"),
|
||||
client().prepareIndex("index", "type").setSource("f", "2014-05-13T00:00:00.000Z"));
|
||||
|
||||
// This is not a random example: serialization with time zones writes shared strings
|
||||
// which used to not work well with the query cache because of the handles stream output
|
||||
// see #9500
|
||||
final SearchResponse r1 = client().prepareSearch("index").setSearchType(SearchType.COUNT)
|
||||
.addAggregation(dateHistogram("histo").field("f").preZone("+01:00").minDocCount(0).interval(DateHistogramInterval.MONTH)).get();
|
||||
assertSearchResponse(r1);
|
||||
|
||||
// The cached is actually used
|
||||
assertThat(client().admin().indices().prepareStats("index").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l));
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
final SearchResponse r2 = client().prepareSearch("index").setSearchType(SearchType.COUNT)
|
||||
.addAggregation(dateHistogram("histo").field("f").preZone("+01:00").minDocCount(0).interval(DateHistogramInterval.MONTH)).get();
|
||||
assertSearchResponse(r2);
|
||||
Histogram h1 = r1.getAggregations().get("histo");
|
||||
Histogram h2 = r2.getAggregations().get("histo");
|
||||
final List<? extends Bucket> buckets1 = h1.getBuckets();
|
||||
final List<? extends Bucket> buckets2 = h2.getBuckets();
|
||||
assertEquals(buckets1.size(), buckets2.size());
|
||||
for (int j = 0; j < buckets1.size(); ++j) {
|
||||
final Bucket b1 = buckets1.get(j);
|
||||
final Bucket b2 = buckets2.get(j);
|
||||
assertEquals(b1.getKey(), b2.getKey());
|
||||
assertEquals(b1.getDocCount(), b2.getDocCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue