From 8c50a65699c4f2f8e8fff1a5793c76c7fbbff4c2 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 6 Apr 2011 21:00:49 +0300 Subject: [PATCH] Percolator doesn't work correctly after index recreation, closes #837. --- .../index/percolator/PercolatorService.java | 33 ++-- .../percolator/RecoveryPercolatorTests.java | 175 ++++++++++++++++++ 2 files changed, 197 insertions(+), 11 deletions(-) create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java index 644368bdabf..e626fdf0e8a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java @@ -30,12 +30,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.field.data.FieldData; -import org.elasticsearch.index.field.data.FieldDataType; -import org.elasticsearch.index.mapper.IdFieldMapper; -import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.SourceFieldSelector; -import org.elasticsearch.index.mapper.TypeFieldMapper; +import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.IndexShardState; @@ -74,6 +69,20 @@ public class PercolatorService extends AbstractIndexComponent { this.shardLifecycleListener = new ShardLifecycleListener(); this.indicesService.indicesLifecycle().addListener(shardLifecycleListener); this.percolator.setIndicesService(indicesService); + + // if percolator is already allocated, make sure to register real time percolation + if (percolatorAllocated()) { + IndexService percolatorIndexService = percolatorIndexService(); + if (percolatorIndexService != null) { + for (IndexShard indexShard : percolatorIndexService) { + try { + indexShard.addListener(realTimePercolatorOperationListener); + } catch (Exception e) { + // ignore + } + } + } + } } public void close() { @@ -103,6 +112,7 @@ public class PercolatorService extends AbstractIndexComponent { private void loadQueries(String indexName) { IndexService indexService = percolatorIndexService(); IndexShard shard = indexService.shard(0); + shard.refresh(new Engine.Refresh(true)); Engine.Searcher searcher = shard.searcher(); try { // create a query to fetch all queries that are registered under the index name (which is the type @@ -141,8 +151,6 @@ public class PercolatorService extends AbstractIndexComponent { class QueriesLoaderCollector extends Collector { - private FieldData fieldData; - private IndexReader reader; private Map queries = Maps.newHashMap(); @@ -155,9 +163,9 @@ public class PercolatorService extends AbstractIndexComponent { } @Override public void collect(int doc) throws IOException { - String id = fieldData.stringValue(doc); // the _source is the query - Document document = reader.document(doc, SourceFieldSelector.INSTANCE); + Document document = reader.document(doc, new UidAndSourceFieldSelector()); + String id = Uid.createUid(document.get(UidFieldMapper.NAME)).id(); byte[] source = document.getBinaryValue(SourceFieldMapper.NAME); try { queries.put(id, percolator.parseQuery(id, source, 0, source.length)); @@ -168,7 +176,6 @@ public class PercolatorService extends AbstractIndexComponent { @Override public void setNextReader(IndexReader reader, int docBase) throws IOException { this.reader = reader; - fieldData = percolatorIndexService().cache().fieldData().cache(FieldDataType.DefaultTypes.STRING, reader, IdFieldMapper.NAME); } @Override public boolean acceptsDocsOutOfOrder() { @@ -198,7 +205,9 @@ public class PercolatorService extends AbstractIndexComponent { for (IndexService indexService : indicesService) { // only load queries for "this" index percolator service if (indexService.index().equals(index())) { + logger.debug("loading percolator queries for index [{}]...", indexService.index().name()); loadQueries(indexService.index().name()); + logger.trace("done loading percolator queries for index [{}]", indexService.index().name()); } } initialQueriesFetchDone = true; @@ -225,7 +234,9 @@ public class PercolatorService extends AbstractIndexComponent { return; } // we load queries for this index + logger.debug("loading percolator queries for index [{}]...", indexService.index().name()); loadQueries(index.name()); + logger.trace("done loading percolator queries for index [{}]", indexService.index().name()); initialQueriesFetchDone = true; } } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java new file mode 100644 index 00000000000..c3442e42054 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java @@ -0,0 +1,175 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.percolator; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.percolate.PercolateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import static org.elasticsearch.client.Requests.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.elasticsearch.common.xcontent.XContentFactory.*; +import static org.elasticsearch.index.query.xcontent.QueryBuilders.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +@Test +public class RecoveryPercolatorTests extends AbstractNodesTests { + + @AfterMethod public void cleanAndCloseNodes() throws Exception { + for (int i = 0; i < 10; i++) { + if (node("node" + i) != null) { + node("node" + i).stop(); + // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well + if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) { + ((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset(); + } + } + } + closeAllNodes(); + } + + @Test public void testRestartNodePercolator1() throws Exception { + logger.info("--> cleaning nodes"); + buildNode("node1", settingsBuilder().put("gateway.type", "local")); + cleanAndCloseNodes(); + + logger.info("--> starting 1 nodes"); + startNode("node1", settingsBuilder().put("gateway.type", "local")); + + Client client = client("node1"); + client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); + + logger.info("--> register a query"); + client.prepareIndex("_percolator", "test", "kuku") + .setSource(jsonBuilder().startObject() + .field("color", "blue") + .field("query", termQuery("field1", "value1")) + .endObject()) + .setRefresh(true) + .execute().actionGet(); + + PercolateResponse percolate = client.preparePercolate("test", "type1").setSource(jsonBuilder().startObject().startObject("doc") + .field("field1", "value1") + .endObject().endObject()) + .execute().actionGet(); + assertThat(percolate.matches().size(), equalTo(1)); + + client.close(); + closeNode("node1"); + + startNode("node1", settingsBuilder().put("gateway.type", "local").build()); + client = client("node1"); + + logger.info("Running Cluster Health (wait for the shards to startup)"); + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); + + percolate = client.preparePercolate("test", "type1").setSource(jsonBuilder().startObject().startObject("doc") + .field("field1", "value1") + .endObject().endObject()) + .execute().actionGet(); + assertThat(percolate.matches().size(), equalTo(1)); + } + + @Test public void testRestartNodePercolator2() throws Exception { + logger.info("--> cleaning nodes"); + buildNode("node1", settingsBuilder().put("gateway.type", "local")); + cleanAndCloseNodes(); + + logger.info("--> starting 1 nodes"); + startNode("node1", settingsBuilder().put("gateway.type", "local")); + + Client client = client("node1"); + client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); + + logger.info("--> register a query"); + client.prepareIndex("_percolator", "test", "kuku") + .setSource(jsonBuilder().startObject() + .field("color", "blue") + .field("query", termQuery("field1", "value1")) + .endObject()) + .setRefresh(true) + .execute().actionGet(); + + assertThat(client.prepareCount("_percolator").setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l)); + + PercolateResponse percolate = client.preparePercolate("test", "type1").setSource(jsonBuilder().startObject().startObject("doc") + .field("field1", "value1") + .endObject().endObject()) + .execute().actionGet(); + assertThat(percolate.matches().size(), equalTo(1)); + + client.close(); + closeNode("node1"); + + startNode("node1", settingsBuilder().put("gateway.type", "local").build()); + client = client("node1"); + + logger.info("Running Cluster Health (wait for the shards to startup)"); + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); + + assertThat(client.prepareCount("_percolator").setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l)); + + client.admin().indices().prepareDelete("test").execute().actionGet(); + client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); + clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); + + assertThat(client.prepareCount("_percolator").setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(0l)); + + percolate = client.preparePercolate("test", "type1").setSource(jsonBuilder().startObject().startObject("doc") + .field("field1", "value1") + .endObject().endObject()) + .execute().actionGet(); + assertThat(percolate.matches().size(), equalTo(0)); + + logger.info("--> register a query"); + client.prepareIndex("_percolator", "test", "kuku") + .setSource(jsonBuilder().startObject() + .field("color", "blue") + .field("query", termQuery("field1", "value1")) + .endObject()) + .setRefresh(true) + .execute().actionGet(); + + assertThat(client.prepareCount("_percolator").setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l)); + + percolate = client.preparePercolate("test", "type1").setSource(jsonBuilder().startObject().startObject("doc") + .field("field1", "value1") + .endObject().endObject()) + .execute().actionGet(); + assertThat(percolate.matches().size(), equalTo(1)); + } +} \ No newline at end of file