Fix searching while shard is being relocated

Shard relocation caused queries to fail in the fetch phase either with a
`AlreadyClosedException` or a `SearchContextMissingException`.
This was caused by the `SearchService` releasing the `SearchContext`s via the
superfluous call of `releaseContextsForShard()` when the
This commit is contained in:
Britta Weber 2013-08-27 17:39:28 +02:00
parent e7ff8ea509
commit 1ab037d4d0
2 changed files with 120 additions and 38 deletions

View File

@ -38,11 +38,9 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
@ -90,16 +88,12 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final FetchPhase fetchPhase;
private final long defaultKeepAlive;
private final ScheduledFuture keepAliveReaper;
private final ScheduledFuture<?> keepAliveReaper;
private final AtomicLong idGenerator = new AtomicLong();
private final CleanContextOnIndicesLifecycleListener indicesLifecycleListener = new CleanContextOnIndicesLifecycleListener();
private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
private final ImmutableMap<String, SearchParseElement> elementParsers;
@ -128,7 +122,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
elementParsers.putAll(fetchPhase.parseElements());
elementParsers.put("stats", new StatsGroupsParseElement());
this.elementParsers = ImmutableMap.copyOf(elementParsers);
indicesLifecycle.addListener(indicesLifecycleListener);
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);
@ -150,23 +143,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
@Override
protected void doClose() throws ElasticSearchException {
keepAliveReaper.cancel(false);
indicesService.indicesLifecycle().removeListener(indicesLifecycleListener);
}
public void releaseContextsForIndex(Index index) {
for (SearchContext context : activeContexts.values()) {
if (context.shardTarget().index().equals(index.name())) {
freeContext(context);
}
}
}
public void releaseContextsForShard(ShardId shardId) {
for (SearchContext context : activeContexts.values()) {
if (context.shardTarget().index().equals(shardId.index().name()) && context.shardTarget().shardId() == shardId.id()) {
freeContext(context);
}
}
}
public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticSearchException {
@ -668,19 +644,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
}
class CleanContextOnIndicesLifecycleListener extends IndicesLifecycle.Listener {
@Override
public void beforeIndexClosed(IndexService indexService) {
releaseContextsForIndex(indexService.index());
}
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
releaseContextsForShard(shardId);
}
}
class Reaper implements Runnable {
@Override
public void run() {

View File

@ -0,0 +1,119 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.search.basic;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.lucene.search.function.CombineFunction;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.client.Requests.searchRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.functionScoreQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders.gaussDecayFunction;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.hamcrest.Matchers.equalTo;
public class SearchWhileRelocatingTests extends AbstractSharedClusterTest {
@Override
protected int numberOfNodes() {
return 3;
}
@Test
public void testSearchAndRelocateConcurrently() throws Exception {
final int numShards = between(10, 20);
String mapping = XContentFactory.jsonBuilder().
startObject().
startObject("type").
startObject("properties").
startObject("loc").field("type", "geo_point").endObject().
startObject("test").field("type", "string").endObject().
endObject().
endObject()
.endObject().string();
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("index.number_of_shards", numShards).put("index.number_of_replicas", 0))
.addMapping("type1", mapping).execute().actionGet();
ensureYellow();
List<IndexRequestBuilder> indexBuilders = new ArrayList<IndexRequestBuilder>();
final int numDocs = between(10, 20);
for (int i = 0; i < numDocs; i++) {
indexBuilders.add(new IndexRequestBuilder(client())
.setType("type")
.setId(Integer.toString(i))
.setIndex("test")
.setSource(
jsonBuilder().startObject().field("test", "value").startObject("loc").field("lat", 11).field("lon", 21)
.endObject().endObject()));
}
indexRandom("test", true, indexBuilders.toArray(new IndexRequestBuilder[indexBuilders.size()]));
final int numIters = atLeast(3);
for (int i = 0; i < numIters; i++) {
allowNodes("test", between(1,3));
client().admin().cluster().prepareReroute().get();
final AtomicBoolean stop = new AtomicBoolean(false);
final List<Throwable> thrownExceptions = new CopyOnWriteArrayList<Throwable>();
final Thread t = new Thread() {
public void run() {
final List<Float> lonlat = new ArrayList<Float>();
lonlat.add(new Float(20));
lonlat.add(new Float(11));
try {
while (!stop.get()) {
SearchResponse sr = client().search(
searchRequest().searchType(SearchType.QUERY_THEN_FETCH).source(
searchSource().size(numDocs).query(
functionScoreQuery(termQuery("test", "value"),
gaussDecayFunction("loc", lonlat, "1000km")).boostMode(
CombineFunction.MULT.getName())))).get();
final SearchHits sh = sr.getHits();
assertThat("Expect num docs in getTotalHits() ",sh.getTotalHits(), equalTo((long) (numDocs)));
assertThat("Expected hits to be the same size the actual hits array", sh.getTotalHits(),
equalTo((long) (sh.getHits().length)));
}
} catch (Throwable t) {
thrownExceptions.add(t);
}
}
};
t.start();
ClusterHealthResponse resp = client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).execute().actionGet();
stop.set(true);
t.join();
assertThat(resp.isTimedOut(), equalTo(false));
assertThat("failed in iteration "+i, thrownExceptions, Matchers.emptyIterable());
}
}
}