Percolator failure when deleting and creating an index, also causes problem with percolated queries isolation between different indices, closes #790.

This commit is contained in:
kimchy 2011-03-18 10:49:28 +02:00
parent 616b3dcb18
commit 923fcf239c
3 changed files with 71 additions and 5 deletions

View File

@ -60,6 +60,8 @@ public class PercolatorService extends AbstractIndexComponent {
private final ShardLifecycleListener shardLifecycleListener; private final ShardLifecycleListener shardLifecycleListener;
private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener();
private final Object mutex = new Object(); private final Object mutex = new Object();
private boolean initialQueriesFetchDone = false; private boolean initialQueriesFetchDone = false;
@ -76,6 +78,18 @@ public class PercolatorService extends AbstractIndexComponent {
public void close() { public void close() {
this.indicesService.indicesLifecycle().removeListener(shardLifecycleListener); this.indicesService.indicesLifecycle().removeListener(shardLifecycleListener);
// clean up any index that has registered real time updated from the percolator shards allocated on this node
IndexService percolatorIndexService = percolatorIndexService();
if (percolatorIndexService != null) {
for (IndexShard indexShard : percolatorIndexService) {
try {
indexShard.removeListener(realTimePercolatorOperationListener);
} catch (Exception e) {
// ignore
}
}
}
} }
public PercolatorExecutor.Response percolate(PercolatorExecutor.SourceRequest request) throws PercolatorException { public PercolatorExecutor.Response percolate(PercolatorExecutor.SourceRequest request) throws PercolatorException {
@ -165,8 +179,10 @@ public class PercolatorService extends AbstractIndexComponent {
class ShardLifecycleListener extends IndicesLifecycle.Listener { class ShardLifecycleListener extends IndicesLifecycle.Listener {
@Override public void afterIndexShardCreated(IndexShard indexShard) { @Override public void afterIndexShardCreated(IndexShard indexShard) {
// add a listener that will update based on changes done to the _percolate index
// the relevant indices with loaded queries
if (indexShard.shardId().index().name().equals(INDEX_NAME)) { if (indexShard.shardId().index().name().equals(INDEX_NAME)) {
indexShard.addListener(new RealTimePercolatorOperationListener()); indexShard.addListener(realTimePercolatorOperationListener);
} }
} }
@ -180,8 +196,11 @@ public class PercolatorService extends AbstractIndexComponent {
} }
// we load the queries for all existing indices // we load the queries for all existing indices
for (IndexService indexService : indicesService) { for (IndexService indexService : indicesService) {
// only load queries for "this" index percolator service
if (indexService.index().equals(index())) {
loadQueries(indexService.index().name()); loadQueries(indexService.index().name());
} }
}
initialQueriesFetchDone = true; initialQueriesFetchDone = true;
} }
} }
@ -211,17 +230,23 @@ public class PercolatorService extends AbstractIndexComponent {
class RealTimePercolatorOperationListener extends OperationListener { class RealTimePercolatorOperationListener extends OperationListener {
@Override public Engine.Create beforeCreate(Engine.Create create) { @Override public Engine.Create beforeCreate(Engine.Create create) {
if (create.type().equals(index().name())) {
percolator.addQuery(create.id(), create.source()); percolator.addQuery(create.id(), create.source());
}
return create; return create;
} }
@Override public Engine.Index beforeIndex(Engine.Index index) { @Override public Engine.Index beforeIndex(Engine.Index index) {
if (index.type().equals(index().name())) {
percolator.addQuery(index.id(), index.source()); percolator.addQuery(index.id(), index.source());
}
return index; return index;
} }
@Override public Engine.Delete beforeDelete(Engine.Delete delete) { @Override public Engine.Delete beforeDelete(Engine.Delete delete) {
if (delete.type().equals(index().name())) {
percolator.removeQuery(delete.id()); percolator.removeQuery(delete.id());
}
return delete; return delete;
} }
} }

View File

@ -442,6 +442,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
public void close(String reason) { public void close(String reason) {
listeners.clear();
listeners = null;
indexSettingsService.removeListener(applyRefreshSettings); indexSettingsService.removeListener(applyRefreshSettings);
synchronized (mutex) { synchronized (mutex) {
if (state != IndexShardState.CLOSED) { if (state != IndexShardState.CLOSED) {

View File

@ -58,6 +58,45 @@ public class SimplePercolatorTests extends AbstractNodesTests {
return client("node1"); return client("node1");
} }
@Test public void percolateOnRecreatedIndex() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
try {
client.admin().indices().prepareDelete("_percolator").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
client.prepareIndex("test", "test", "1").setSource("field1", "value1").execute().actionGet();
logger.info("--> register a query");
client.prepareIndex("_percolator", "test", "kuku")
.setSource(jsonBuilder().startObject()
.field("color", "blue")
.field("query", termQuery("field1", "value1"))
.endObject())
.setRefresh(true)
.execute().actionGet();
client.admin().indices().prepareDelete("test").execute().actionGet();
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
client.prepareIndex("test", "test", "1").setSource("field1", "value1").execute().actionGet();
logger.info("--> register a query");
client.prepareIndex("_percolator", "test", "kuku")
.setSource(jsonBuilder().startObject()
.field("color", "blue")
.field("query", termQuery("field1", "value1"))
.endObject())
.setRefresh(true)
.execute().actionGet();
}
@Test public void registerPercolatorAndThenCreateAnIndex() throws Exception { @Test public void registerPercolatorAndThenCreateAnIndex() throws Exception {
try { try {
client.admin().indices().prepareDelete("test").execute().actionGet(); client.admin().indices().prepareDelete("test").execute().actionGet();