diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java index c3484f24649..929bdf525dc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java @@ -60,6 +60,8 @@ public class PercolatorService extends AbstractIndexComponent { private final ShardLifecycleListener shardLifecycleListener; + private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener(); + private final Object mutex = new Object(); private boolean initialQueriesFetchDone = false; @@ -76,6 +78,18 @@ public class PercolatorService extends AbstractIndexComponent { public void close() { this.indicesService.indicesLifecycle().removeListener(shardLifecycleListener); + + // clean up any index that has registered real time updated from the percolator shards allocated on this node + IndexService percolatorIndexService = percolatorIndexService(); + if (percolatorIndexService != null) { + for (IndexShard indexShard : percolatorIndexService) { + try { + indexShard.removeListener(realTimePercolatorOperationListener); + } catch (Exception e) { + // ignore + } + } + } } public PercolatorExecutor.Response percolate(PercolatorExecutor.SourceRequest request) throws PercolatorException { @@ -165,8 +179,10 @@ public class PercolatorService extends AbstractIndexComponent { class ShardLifecycleListener extends IndicesLifecycle.Listener { @Override public void afterIndexShardCreated(IndexShard indexShard) { + // add a listener that will update based on changes done to the _percolate index + // the relevant indices with loaded queries if (indexShard.shardId().index().name().equals(INDEX_NAME)) { - indexShard.addListener(new RealTimePercolatorOperationListener()); + indexShard.addListener(realTimePercolatorOperationListener); } } @@ -180,7 +196,10 @@ public class PercolatorService extends AbstractIndexComponent { } // we load the queries for all existing indices for (IndexService indexService : indicesService) { - loadQueries(indexService.index().name()); + // only load queries for "this" index percolator service + if (indexService.index().equals(index())) { + loadQueries(indexService.index().name()); + } } initialQueriesFetchDone = true; } @@ -211,17 +230,23 @@ public class PercolatorService extends AbstractIndexComponent { class RealTimePercolatorOperationListener extends OperationListener { @Override public Engine.Create beforeCreate(Engine.Create create) { - percolator.addQuery(create.id(), create.source()); + if (create.type().equals(index().name())) { + percolator.addQuery(create.id(), create.source()); + } return create; } @Override public Engine.Index beforeIndex(Engine.Index index) { - percolator.addQuery(index.id(), index.source()); + if (index.type().equals(index().name())) { + percolator.addQuery(index.id(), index.source()); + } return index; } @Override public Engine.Delete beforeDelete(Engine.Delete delete) { - percolator.removeQuery(delete.id()); + if (delete.type().equals(index().name())) { + percolator.removeQuery(delete.id()); + } return delete; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index dbb1c48ddf4..4f4686d3493 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -442,6 +442,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } public void close(String reason) { + listeners.clear(); + listeners = null; indexSettingsService.removeListener(applyRefreshSettings); synchronized (mutex) { if (state != IndexShardState.CLOSED) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java index cb41203aa0c..adf70505581 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java @@ -58,6 +58,45 @@ public class SimplePercolatorTests extends AbstractNodesTests { return client("node1"); } + @Test public void percolateOnRecreatedIndex() throws Exception { + try { + client.admin().indices().prepareDelete("test").execute().actionGet(); + } catch (Exception e) { + // ignore + } + try { + client.admin().indices().prepareDelete("_percolator").execute().actionGet(); + } catch (Exception e) { + // ignore + } + + client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); + client.prepareIndex("test", "test", "1").setSource("field1", "value1").execute().actionGet(); + + logger.info("--> register a query"); + client.prepareIndex("_percolator", "test", "kuku") + .setSource(jsonBuilder().startObject() + .field("color", "blue") + .field("query", termQuery("field1", "value1")) + .endObject()) + .setRefresh(true) + .execute().actionGet(); + + client.admin().indices().prepareDelete("test").execute().actionGet(); + + client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); + client.prepareIndex("test", "test", "1").setSource("field1", "value1").execute().actionGet(); + + logger.info("--> register a query"); + client.prepareIndex("_percolator", "test", "kuku") + .setSource(jsonBuilder().startObject() + .field("color", "blue") + .field("query", termQuery("field1", "value1")) + .endObject()) + .setRefresh(true) + .execute().actionGet(); + } + @Test public void registerPercolatorAndThenCreateAnIndex() throws Exception { try { client.admin().indices().prepareDelete("test").execute().actionGet();