Percolator: Registering (indexing) a new percolator query will still be stored in memory if actually indexing it fails, closes #1965.

This commit is contained in:
Shay Banon 2012-05-19 19:36:01 +02:00
parent b4512cd471
commit 2c274e59d5
6 changed files with 138 additions and 13 deletions

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.cache.bloom.BloomCache;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
@ -95,6 +96,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private final ThreadPool threadPool;
private final ShardIndexingService indexingService;
private final IndexSettingsService indexSettingsService;
@Nullable
@ -158,7 +161,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
@Inject
public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
IndexSettingsService indexSettingsService, @Nullable IndicesWarmer warmer,
IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
AnalysisService analysisService, SimilarityService similarityService,
@ -176,6 +179,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
this.threadPool = threadPool;
this.indexSettingsService = indexSettingsService;
this.indexingService = indexingService;
this.warmer = (InternalIndicesWarmer) warmer;
this.store = store;
this.deletionPolicy = deletionPolicy;
@ -471,6 +475,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
Translog.Location translogLocation = translog.add(new Translog.Create(create));
versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
indexingService.postCreateUnderLock(create);
}
}
@ -583,6 +589,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
Translog.Location translogLocation = translog.add(new Translog.Index(index));
versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
indexingService.postIndexUnderLock(index);
}
}
@ -685,6 +693,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
}
indexingService.postDeleteUnderLock(delete);
}
}

View File

@ -22,30 +22,78 @@ package org.elasticsearch.index.indexing;
import org.elasticsearch.index.engine.Engine;
/**
*
* An indexing listener for indexing, delete, events.
*/
public abstract class IndexingOperationListener {
/**
* Called before the indexing occurs.
*/
public Engine.Create preCreate(Engine.Create create) {
return create;
}
/**
* Called after the indexing occurs, under a locking scheme to maintain
* concurrent updates to the same doc.
* <p/>
* Note, long operations should not occur under this callback.
*/
public void postCreateUnderLock(Engine.Create create) {
}
/**
* Called after the indexing operation occurred.
*/
public void postCreate(Engine.Create create) {
}
/**
* Called before the indexing occurs.
*/
public Engine.Index preIndex(Engine.Index index) {
return index;
}
/**
* Called after the indexing occurs, under a locking scheme to maintain
* concurrent updates to the same doc.
* <p/>
* Note, long operations should not occur under this callback.
*/
public void postIndexUnderLock(Engine.Index index) {
}
/**
* Called after the indexing operation occurred.
*/
public void postIndex(Engine.Index index) {
}
/**
* Called before the delete occurs.
*/
public Engine.Delete preDelete(Engine.Delete delete) {
return delete;
}
/**
* Called after the delete occurs, under a locking scheme to maintain
* concurrent updates to the same doc.
* <p/>
* Note, long operations should not occur under this callback.
*/
public void postDeleteUnderLock(Engine.Delete delete) {
}
/**
* Called after the delete operation occurred.
*/
public void postDelete(Engine.Delete delete) {
}

View File

@ -103,13 +103,29 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
return create;
}
public void postCreateUnderLock(Engine.Create create) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
try {
listener.postCreateUnderLock(create);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}
public void postCreate(Engine.Create create) {
long took = create.endTime() - create.startTime();
totalStats.indexMetric.inc(took);
typeStats(create.type()).indexMetric.inc(took);
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postCreate(create);
try {
listener.postCreate(create);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}
@ -125,6 +141,18 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
return index;
}
public void postIndexUnderLock(Engine.Index index) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndexUnderLock(index);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}
public void postIndex(Engine.Index index) {
long took = index.endTime() - index.startTime();
totalStats.indexMetric.inc(took);
@ -134,7 +162,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
typeStats.indexCurrent.dec();
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postIndex(index);
try {
listener.postIndex(index);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}
@ -155,6 +187,18 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
return delete;
}
public void postDeleteUnderLock(Engine.Delete delete) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
try {
listener.postDeleteUnderLock(delete);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}
public void postDelete(Engine.Delete delete) {
long took = delete.endTime() - delete.startTime();
totalStats.deleteMetric.inc(took);
@ -164,7 +208,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
typeStats.deleteCurrent.dec();
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postDelete(delete);
try {
listener.postDelete(delete);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}

View File

@ -223,7 +223,7 @@ public class PercolatorExecutor extends AbstractIndexComponent {
String currentFieldName = null;
XContentParser.Token token = parser.nextToken(); // move the START_OBJECT
if (token != XContentParser.Token.START_OBJECT) {
throw new ElasticSearchException("Failed to add query [" + name + "], not starting with OBJECT");
throw new ElasticSearchException("failed to parse query [" + name + "], not starting with OBJECT");
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
@ -240,8 +240,8 @@ public class PercolatorExecutor extends AbstractIndexComponent {
}
}
return query;
} catch (IOException e) {
throw new ElasticSearchException("Failed to add query [" + name + "]", e);
} catch (Exception e) {
throw new ElasticSearchException("failed to parse query [" + name + "]", e);
} finally {
if (parser != null) {
parser.close();

View File

@ -257,26 +257,44 @@ public class PercolatorService extends AbstractIndexComponent {
@Override
public Engine.Create preCreate(Engine.Create create) {
// validate the query here, before we index
if (create.type().equals(index().name())) {
percolator.addQuery(create.id(), create.source(), create.sourceOffset(), create.sourceLength());
percolator.parseQuery(create.id(), create.source(), create.sourceOffset(), create.sourceLength());
}
return create;
}
@Override
public void postCreateUnderLock(Engine.Create create) {
// add the query under a doc lock
if (create.type().equals(index().name())) {
percolator.addQuery(create.id(), create.source(), create.sourceOffset(), create.sourceLength());
}
}
@Override
public Engine.Index preIndex(Engine.Index index) {
// validate the query here, before we index
if (index.type().equals(index().name())) {
percolator.addQuery(index.id(), index.source(), index.sourceOffset(), index.sourceLength());
percolator.parseQuery(index.id(), index.source(), index.sourceOffset(), index.sourceLength());
}
return index;
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
public void postIndexUnderLock(Engine.Index index) {
// add the query under a doc lock
if (index.type().equals(index().name())) {
percolator.addQuery(index.id(), index.source(), index.sourceOffset(), index.sourceLength());
}
}
@Override
public void postDeleteUnderLock(Engine.Delete delete) {
// remove the query under a lock
if (delete.type().equals(index().name())) {
percolator.removeQuery(delete.id());
}
return delete;
}
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.bloom.none.NoneBloomCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.robin.RobinEngine;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
@ -38,7 +39,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {
protected Engine createEngine(Store store, Translog translog) {
return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new ShardIndexingService(shardId, EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index()));
}
}