From d5d4c9b140dbc8e7a0c4c1ffbdd2b1c773302fcb Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 25 Sep 2015 15:43:55 +0200 Subject: [PATCH] Remove ES internal deletion policies in favour of Lucenes implementations These classes are really duplicates and are just here for historical reasons. We don't need these anymore since the same classes exist in lucene today. This also removes the guice injection for DeletionPolicy and make them shard private. --- .../org/elasticsearch/index/IndexService.java | 3 - .../AbstractESDeletionPolicy.java | 60 ----- .../deletionpolicy/DeletionPolicyModule.java | 38 --- .../KeepLastNDeletionPolicy.java | 63 ----- .../KeepOnlyLastDeletionPolicy.java | 65 ------ .../SnapshotDeletionPolicy.java | 220 ------------------ .../deletionpolicy/SnapshotIndexCommit.java | 74 ------ .../deletionpolicy/SnapshotIndexCommits.java | 55 ----- .../elasticsearch/index/engine/Engine.java | 16 +- .../index/engine/EngineConfig.java | 4 +- .../index/engine/InternalEngine.java | 16 +- .../index/engine/ShadowEngine.java | 6 +- .../elasticsearch/index/shard/IndexShard.java | 28 ++- .../index/shard/ShadowIndexShard.java | 6 +- .../index/snapshots/IndexShardRepository.java | 4 +- .../BlobStoreIndexShardRepository.java | 25 +- .../recovery/RecoverySourceHandler.java | 17 +- .../repositories/Repository.java | 5 +- .../snapshots/SnapshotShardsService.java | 6 +- .../SnapshotDeletionPolicyTests.java | 180 -------------- .../SnapshotIndexCommitExistsMatcher.java | 58 ----- .../index/engine/InternalEngineTests.java | 9 +- .../index/engine/ShadowEngineTests.java | 9 +- .../elasticsearch/index/store/StoreTests.java | 4 +- 24 files changed, 64 insertions(+), 907 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/index/deletionpolicy/AbstractESDeletionPolicy.java delete mode 100644 core/src/main/java/org/elasticsearch/index/deletionpolicy/DeletionPolicyModule.java delete mode 100644 core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepLastNDeletionPolicy.java delete mode 100644 core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepOnlyLastDeletionPolicy.java delete mode 100644 core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotDeletionPolicy.java delete mode 100644 core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java delete mode 100644 core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommits.java delete mode 100644 core/src/test/java/org/elasticsearch/index/deletionpolicy/SnapshotDeletionPolicyTests.java delete mode 100644 core/src/test/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommitExistsMatcher.java diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index cf43e6c2e8c..85483e2dc5a 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -40,7 +40,6 @@ import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; -import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule; import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataService; @@ -365,8 +364,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone injector.getInstance(IndicesQueryCache.class).onClose(shardId); } }), path)); - modules.add(new DeletionPolicyModule()); - pluginsService.processModules(modules); try { diff --git a/core/src/main/java/org/elasticsearch/index/deletionpolicy/AbstractESDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/deletionpolicy/AbstractESDeletionPolicy.java deleted file mode 100644 index 23296853482..00000000000 --- a/core/src/main/java/org/elasticsearch/index/deletionpolicy/AbstractESDeletionPolicy.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.deletionpolicy; - -import org.apache.lucene.index.IndexDeletionPolicy; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.IndexShardComponent; -import org.elasticsearch.index.shard.ShardId; - -abstract class AbstractESDeletionPolicy extends IndexDeletionPolicy implements IndexShardComponent { - - protected final ESLogger logger; - - protected final ShardId shardId; - - protected final Settings indexSettings; - - protected AbstractESDeletionPolicy(ShardId shardId, @IndexSettings Settings indexSettings) { - this.shardId = shardId; - this.indexSettings = indexSettings; - this.logger = Loggers.getLogger(getClass(), indexSettings, shardId); - } - - @Override - public ShardId shardId() { - return this.shardId; - } - - @Override - public Settings indexSettings() { - return this.indexSettings; - } - - public String nodeName() { - return indexSettings.get("name", ""); - } - - - -} diff --git a/core/src/main/java/org/elasticsearch/index/deletionpolicy/DeletionPolicyModule.java b/core/src/main/java/org/elasticsearch/index/deletionpolicy/DeletionPolicyModule.java deleted file mode 100644 index 55ec61fca93..00000000000 --- a/core/src/main/java/org/elasticsearch/index/deletionpolicy/DeletionPolicyModule.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.deletionpolicy; - -import org.apache.lucene.index.IndexDeletionPolicy; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.name.Names; - -public class DeletionPolicyModule extends AbstractModule { - - @Override - protected void configure() { - bind(IndexDeletionPolicy.class) - .annotatedWith(Names.named("actual")) - .to(KeepOnlyLastDeletionPolicy.class) - .asEagerSingleton(); - - bind(SnapshotDeletionPolicy.class) - .asEagerSingleton(); - } -} diff --git a/core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepLastNDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepLastNDeletionPolicy.java deleted file mode 100644 index f9b3b8901d9..00000000000 --- a/core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepLastNDeletionPolicy.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.deletionpolicy; - -import org.apache.lucene.index.IndexCommit; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; -import java.util.List; - -/** - * - */ -public class KeepLastNDeletionPolicy extends AbstractESDeletionPolicy { - - private final int numToKeep; - - @Inject - public KeepLastNDeletionPolicy(ShardId shardId, @IndexSettings Settings indexSettings) { - super(shardId, indexSettings); - this.numToKeep = indexSettings.getAsInt("index.deletionpolicy.num_to_keep", 5); - logger.debug("Using [keep_last_n] deletion policy with num_to_keep[{}]", numToKeep); - } - - @Override - public void onInit(List commits) throws IOException { - // do no deletions on init - doDeletes(commits); - } - - @Override - public void onCommit(List commits) throws IOException { - doDeletes(commits); - } - - private void doDeletes(List commits) { - int size = commits.size(); - for (int i = 0; i < size - numToKeep; i++) { - commits.get(i).delete(); - } - } - -} diff --git a/core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepOnlyLastDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepOnlyLastDeletionPolicy.java deleted file mode 100644 index 9a331c61868..00000000000 --- a/core/src/main/java/org/elasticsearch/index/deletionpolicy/KeepOnlyLastDeletionPolicy.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.deletionpolicy; - -import org.apache.lucene.index.IndexCommit; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.ShardId; - -import java.util.List; - -/** - * This {@link org.apache.lucene.index.IndexDeletionPolicy} implementation that - * keeps only the most recent commit and immediately removes - * all prior commits after a new commit is done. This is - * the default deletion policy. - */ -public class KeepOnlyLastDeletionPolicy extends AbstractESDeletionPolicy { - - @Inject - public KeepOnlyLastDeletionPolicy(ShardId shardId, @IndexSettings Settings indexSettings) { - super(shardId, indexSettings); - logger.debug("Using [keep_only_last] deletion policy"); - } - - /** - * Deletes all commits except the most recent one. - */ - @Override - public void onInit(List commits) { - // Note that commits.size() should normally be 1: - onCommit(commits); - } - - /** - * Deletes all commits except the most recent one. - */ - @Override - public void onCommit(List commits) { - // Note that commits.size() should normally be 2 (if not - // called by onInit above): - int size = commits.size(); - for (int i = 0; i < size - 1; i++) { - commits.get(i).delete(); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotDeletionPolicy.java deleted file mode 100644 index be261b1b5f2..00000000000 --- a/core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotDeletionPolicy.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.deletionpolicy; - -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexDeletionPolicy; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.name.Named; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.shard.IndexShardComponent; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ConcurrentMap; - -/** - * Snapshot deletion policy allows to get snapshots of an index state (last commit or all commits) - * and if the deletion policy is used with all open index writers (JVM level) then the snapshot - * state will not be deleted until it will be released. - * - * - */ -public class SnapshotDeletionPolicy extends AbstractESDeletionPolicy { - - private final IndexDeletionPolicy primary; - - private final ConcurrentMap snapshots = ConcurrentCollections.newConcurrentMap(); - - private volatile List commits; - - private final Object mutex = new Object(); - - private SnapshotIndexCommit lastCommit; - - /** - * Constructs a new snapshot deletion policy that wraps the provided deletion policy. - */ - @Inject - public SnapshotDeletionPolicy(@Named("actual") IndexDeletionPolicy primary) { - super(((IndexShardComponent) primary).shardId(), ((IndexShardComponent) primary).indexSettings()); - this.primary = primary; - } - - /** - * Called by Lucene. Same as {@link #onCommit(java.util.List)}. - */ - @Override - public void onInit(List commits) throws IOException { - if (!commits.isEmpty()) { // this might be empty if we create a new index. - // the behavior has changed in Lucene 4.4 that calls onInit even with an empty commits list. - onCommit(commits); - } - } - - /** - * Called by Lucene.. Wraps the provided commits with {@link SnapshotIndexCommit} - * and delegates to the wrapped deletion policy. - */ - @Override - public void onCommit(List commits) throws IOException { - assert !commits.isEmpty() : "Commits must not be empty"; - synchronized (mutex) { - List snapshotCommits = wrapCommits(commits); - primary.onCommit(snapshotCommits); - - // clean snapshots that their respective counts are 0 (should not really happen) - for (Iterator it = snapshots.values().iterator(); it.hasNext(); ) { - SnapshotHolder holder = it.next(); - if (holder.counter <= 0) { - it.remove(); - } - } - // build the current commits list (all the ones that are not deleted by the primary) - List newCommits = new ArrayList<>(); - for (SnapshotIndexCommit commit : snapshotCommits) { - if (!commit.isDeleted()) { - newCommits.add(commit); - } - } - this.commits = newCommits; - // the last commit that is not deleted - this.lastCommit = newCommits.get(newCommits.size() - 1); - - } - } - - /** - * Snapshots all the current commits in the index. Make sure to call - * {@link SnapshotIndexCommits#close()} to release it. - */ - public SnapshotIndexCommits snapshots() throws IOException { - synchronized (mutex) { - if (snapshots == null) { - throw new IllegalStateException("Snapshot deletion policy has not been init yet..."); - } - List result = new ArrayList<>(commits.size()); - for (SnapshotIndexCommit commit : commits) { - result.add(snapshot(commit)); - } - return new SnapshotIndexCommits(result); - } - } - - /** - * Returns a snapshot of the index (for the last commit point). Make - * sure to call {@link SnapshotIndexCommit#close()} in order to release it. - */ - public SnapshotIndexCommit snapshot() throws IOException { - synchronized (mutex) { - if (lastCommit == null) { - throw new IllegalStateException("Snapshot deletion policy has not been init yet..."); - } - return snapshot(lastCommit); - } - } - - @Override - public IndexDeletionPolicy clone() { - // Lucene IW makes a clone internally but since we hold on to this instance - // the clone will just be the identity. See InternalEngine recovery why we need this. - return this; - } - - /** - * Helper method to snapshot a give commit. - */ - private SnapshotIndexCommit snapshot(SnapshotIndexCommit commit) throws IOException { - SnapshotHolder snapshotHolder = snapshots.get(commit.getGeneration()); - if (snapshotHolder == null) { - snapshotHolder = new SnapshotHolder(0); - snapshots.put(commit.getGeneration(), snapshotHolder); - } - snapshotHolder.counter++; - return new OneTimeReleaseSnapshotIndexCommit(this, commit); - } - - /** - * Returns true if the version has been snapshotted. - */ - boolean isHeld(long version) { - SnapshotDeletionPolicy.SnapshotHolder holder = snapshots.get(version); - return holder != null && holder.counter > 0; - } - - /** - * Releases the version provided. Returns true if the release was successful. - */ - boolean close(long version) { - synchronized (mutex) { - SnapshotDeletionPolicy.SnapshotHolder holder = snapshots.get(version); - if (holder == null) { - return false; - } - if (holder.counter <= 0) { - snapshots.remove(version); - return false; - } - if (--holder.counter == 0) { - snapshots.remove(version); - } - return true; - } - } - - /** - * A class that wraps an {@link SnapshotIndexCommit} and makes sure that release will only - * be called once on it. - */ - private static class OneTimeReleaseSnapshotIndexCommit extends SnapshotIndexCommit { - private volatile boolean released = false; - - OneTimeReleaseSnapshotIndexCommit(SnapshotDeletionPolicy deletionPolicy, IndexCommit cp) throws IOException { - super(deletionPolicy, cp); - } - - @Override - public void close() { - if (released) { - return; - } - released = true; - ((SnapshotIndexCommit) delegate).close(); - } - } - - private static class SnapshotHolder { - int counter; - - private SnapshotHolder(int counter) { - this.counter = counter; - } - } - - private List wrapCommits(List commits) throws IOException { - final int count = commits.size(); - List snapshotCommits = new ArrayList<>(count); - for (int i = 0; i < count; i++) - snapshotCommits.add(new SnapshotIndexCommit(this, commits.get(i))); - return snapshotCommits; - } -} diff --git a/core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java b/core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java deleted file mode 100644 index d598d53f751..00000000000 --- a/core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommit.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.deletionpolicy; - -import org.apache.lucene.index.IndexCommit; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lucene.IndexCommitDelegate; - -import java.io.IOException; -import java.util.ArrayList; - -/** - * A snapshot index commit point. While this is held and {@link #close()} - * was not called, no files will be deleted that relates to this commit point - * ({@link #getFileNames()}). - * - * - */ -public class SnapshotIndexCommit extends IndexCommitDelegate implements Releasable { - - private final SnapshotDeletionPolicy deletionPolicy; - - private final String[] files; - - SnapshotIndexCommit(SnapshotDeletionPolicy deletionPolicy, IndexCommit cp) throws IOException { - super(cp); - this.deletionPolicy = deletionPolicy; - ArrayList tmpFiles = new ArrayList<>(); - for (String o : cp.getFileNames()) { - tmpFiles.add(o); - } - files = tmpFiles.toArray(new String[tmpFiles.size()]); - } - - public String[] getFiles() { - return files; - } - - /** - * Releases the current snapshot. - */ - @Override - public void close() { - deletionPolicy.close(getGeneration()); - } - - /** - * Override the delete operation, and only actually delete it if it - * is not held by the {@link SnapshotDeletionPolicy}. - */ - @Override - public void delete() { - if (!deletionPolicy.isHeld(getGeneration())) { - delegate.delete(); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommits.java b/core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommits.java deleted file mode 100644 index 92e7dbebc3e..00000000000 --- a/core/src/main/java/org/elasticsearch/index/deletionpolicy/SnapshotIndexCommits.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.deletionpolicy; - -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; - -import java.util.Iterator; -import java.util.List; - -/** - * Represents a snapshot view of several commits. Provides a way to iterate over - * them as well as a simple method to release all of them. - * - * - */ -public class SnapshotIndexCommits implements Iterable, Releasable { - - private final List commits; - - public SnapshotIndexCommits(List commits) { - this.commits = commits; - } - - public int size() { - return commits.size(); - } - - @Override - public Iterator iterator() { - return commits.iterator(); - } - - @Override - public void close() { - Releasables.close(commits); - } -} diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 0c66b5148c4..2c1e855e132 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -19,17 +19,7 @@ package org.elasticsearch.index.engine; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FilterLeafReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.SegmentReader; -import org.apache.lucene.index.Term; +import org.apache.lucene.index.*; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.SearcherManager; @@ -51,8 +41,6 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; @@ -516,7 +504,7 @@ public abstract class Engine implements Closeable { * * @param flushFirst indicates whether the engine should flush before returning the snapshot */ - public abstract SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException; + public abstract IndexCommit snapshotIndex(boolean flushFirst) throws EngineException; /** * fail engine due to some error. the engine will also be closed. diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index d18097953aa..04eba9fe347 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.engine; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.similarities.Similarity; @@ -30,7 +31,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.ShardId; @@ -305,7 +305,7 @@ public final class EngineConfig { } /** - * Returns a {@link org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy} used in the engines + * Returns a {@link SnapshotDeletionPolicy} used in the engines * {@link org.apache.lucene.index.IndexWriter}. */ public SnapshotDeletionPolicy getDeletionPolicy() { diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 773979dba5a..72c8a6d8b1d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -19,19 +19,8 @@ package org.elasticsearch.index.engine; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.*; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.LiveIndexWriterConfig; -import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.MultiReader; -import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; @@ -59,7 +48,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ReleasableLock; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; @@ -885,7 +873,7 @@ public class InternalEngine extends Engine { } @Override - public SnapshotIndexCommit snapshotIndex(final boolean flushFirst) throws EngineException { + public IndexCommit snapshotIndex(final boolean flushFirst) throws EngineException { // we have to flush outside of the readlock otherwise we might have a problem upgrading // the to a write lock when we fail the engine in this operation if (flushFirst) { diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index f89b9ce471a..f589b289c17 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.engine; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; @@ -29,7 +30,6 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.translog.Translog; import java.io.IOException; @@ -209,10 +209,12 @@ public class ShadowEngine extends Engine { } @Override - public SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException { + public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException { throw new UnsupportedOperationException("Can not take snapshot from a shadow engine"); } + + @Override protected SearcherManager getSearcherManager() { return searcherManager; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 683beacfa51..89419d90921 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -22,6 +22,9 @@ package org.elasticsearch.index.shard; import java.nio.charset.StandardCharsets; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; @@ -65,11 +68,8 @@ import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.cache.request.ShardRequestCache; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.*; import org.elasticsearch.index.fielddata.FieldDataStats; -import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.fielddata.ShardFieldData; import org.elasticsearch.index.flush.FlushStats; @@ -206,16 +206,15 @@ public class IndexShard extends AbstractIndexShardComponent { ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, - @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory, + @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) { super(shardId, indexSettingsService.getSettings()); this.codecService = codecService; this.warmer = warmer; - this.deletionPolicy = deletionPolicy; + this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); this.similarityService = similarityService; this.wrappingService = wrappingService; Objects.requireNonNull(store, "Store must be provided to the index shard"); - Objects.requireNonNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard"); this.engineFactory = factory; this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; this.indexSettingsService = indexSettingsService; @@ -745,7 +744,13 @@ public class IndexShard extends AbstractIndexShardComponent { return luceneVersion == null ? Version.indexCreated(indexSettings).luceneVersion : luceneVersion; } - public SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException { + /** + * Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this + * commit won't be freed until the commit / snapshot is released via {@link #releaseSnapshot(IndexCommit)}. + * + * @param flushFirst true if the index should first be flushed to disk / a low level lucene commit should be executed + */ + public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException { IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { @@ -755,6 +760,15 @@ public class IndexShard extends AbstractIndexShardComponent { } } + + /** + * Releases a snapshot taken from {@link #snapshotIndex(boolean)} this must be called to release the resources + * referenced by the given snapshot {@link IndexCommit}. + */ + public void releaseSnapshot(IndexCommit snapshot) throws IOException { + deletionPolicy.release(snapshot); + } + /** * Fails the shard and marks the shard store as corrupted if * e is caused by index corruption diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 6c45331f826..da5cef0f1b1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -27,7 +27,6 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.engine.IndexSearcherWrappingService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; @@ -35,7 +34,6 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; -import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.similarity.SimilarityService; @@ -64,14 +62,14 @@ public final class ShadowIndexShard extends IndexShard { IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, @Nullable IndicesWarmer warmer, - SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, + SimilarityService similarityService, EngineFactory factory, ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException { super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService, threadPool, mapperService, queryParserService, indexCache, indexAliasesService, indicesQueryCache, codecService, termVectorsService, indexFieldDataService, indexService, - warmer, deletionPolicy, similarityService, + warmer, similarityService, factory, clusterService, path, bigArrays, wrappingService); } diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java index 8ce487fe144..ca481e1430d 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java @@ -19,9 +19,9 @@ package org.elasticsearch.index.snapshots; +import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.SnapshotId; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.RecoveryState; @@ -47,7 +47,7 @@ public interface IndexShardRepository { * @param snapshotIndexCommit commit point * @param snapshotStatus snapshot status */ - void snapshot(SnapshotId snapshotId, ShardId shardId, SnapshotIndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); + void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); /** * Restores snapshot of the shard. diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 912be76fb81..c9344d382c8 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -19,10 +19,7 @@ package org.elasticsearch.index.snapshots.blobstore; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.*; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; @@ -49,7 +46,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.*; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -150,12 +146,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements this.snapshotRateLimiter = snapshotRateLimiter; this.restoreRateLimiter = restoreRateLimiter; this.rateLimiterListener = rateLimiterListener; - this.snapshotThrottleListener = new RateLimitingInputStream.Listener() { - @Override - public void onPause(long nanos) { - rateLimiterListener.onSnapshotPause(nanos); - } - }; + this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos); this.compress = compress; indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress()); indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher); @@ -166,7 +157,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * {@inheritDoc} */ @Override - public void snapshot(SnapshotId snapshotId, ShardId shardId, SnapshotIndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { SnapshotContext snapshotContext = new SnapshotContext(snapshotId, shardId, snapshotStatus); snapshotStatus.startTime(System.currentTimeMillis()); @@ -495,7 +486,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) { super(snapshotId, Version.CURRENT, shardId); IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - store = indexService.shardInjectorSafe(shardId.id()).getInstance(Store.class); + store = indexService.shard(shardId.id()).store(); this.snapshotStatus = snapshotStatus; } @@ -505,7 +496,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * * @param snapshotIndexCommit snapshot commit point */ - public void snapshot(SnapshotIndexCommit snapshotIndexCommit) { + public void snapshot(IndexCommit snapshotIndexCommit) { logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName); store.incRef(); try { @@ -528,12 +519,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements ArrayList filesToSnapshot = new ArrayList<>(); final Store.MetadataSnapshot metadata; // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should + final Collection fileNames; try { metadata = store.getMetadata(snapshotIndexCommit); + fileNames = snapshotIndexCommit.getFileNames(); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); } - for (String fileName : snapshotIndexCommit.getFiles()) { + for (String fileName : fileNames) { if (snapshotStatus.aborted()) { logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); throw new IndexShardSnapshotFailedException(shardId, "Aborted"); @@ -776,7 +769,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements */ public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { super(snapshotId, version, shardId, snapshotShardId); - store = indicesService.indexServiceSafe(shardId.getIndex()).shardInjectorSafe(shardId.id()).getInstance(Store.class); + store = indicesService.indexServiceSafe(shardId.getIndex()).shard(shardId.id()).store(); this.recoveryState = recoveryState; } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 102fa98cedd..65b58868964 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices.recovery; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.IOContext; @@ -33,14 +34,12 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.compress.CompressorFactory; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.CancellableThreads.Interruptable; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.shard.IllegalIndexShardStateException; @@ -122,7 +121,7 @@ public class RecoverySourceHandler { assert engine.getTranslog() != null : "translog must not be null"; try (Translog.View translogView = engine.getTranslog().newView()) { logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration()); - final SnapshotIndexCommit phase1Snapshot; + final IndexCommit phase1Snapshot; try { phase1Snapshot = shard.snapshotIndex(false); } catch (Throwable e) { @@ -135,7 +134,11 @@ public class RecoverySourceHandler { } catch (Throwable e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { - Releasables.closeWhileHandlingException(phase1Snapshot); + try { + shard.releaseSnapshot(phase1Snapshot); + } catch (IOException ex) { + logger.warn("releasing snapshot caused exception", ex); + } } logger.trace("snapshot translog for recovery. current size is [{}]", translogView.totalOperations()); @@ -151,7 +154,7 @@ public class RecoverySourceHandler { } /** - * Perform phase1 of the recovery operations. Once this {@link SnapshotIndexCommit} + * Perform phase1 of the recovery operations. Once this {@link IndexCommit} * snapshot has been performed no commit operations (files being fsync'd) * are effectively allowed on this index until all recovery phases are done *

@@ -159,7 +162,7 @@ public class RecoverySourceHandler { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - public void phase1(final SnapshotIndexCommit snapshot, final Translog.View translogView) { + public void phase1(final IndexCommit snapshot, final Translog.View translogView) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered long totalSize = 0; @@ -176,7 +179,7 @@ public class RecoverySourceHandler { shard.engine().failEngine("recovery", ex); throw ex; } - for (String name : snapshot.getFiles()) { + for (String name : snapshot.getFileNames()) { final StoreFileMetaData md = recoverySourceMetadata.get(name); if (md == null) { logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap()); diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java index a766c3a4ff2..294b36df491 100644 --- a/core/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java @@ -18,9 +18,12 @@ */ package org.elasticsearch.repositories; +import org.apache.lucene.index.IndexCommit; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotShardFailure; @@ -38,7 +41,7 @@ import java.util.List; *

    *
  • Master calls {@link #initializeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)} * with list of indices that will be included into the snapshot
  • - *
  • Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(org.elasticsearch.cluster.metadata.SnapshotId, org.elasticsearch.index.shard.ShardId, org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit, org.elasticsearch.index.snapshots.IndexShardSnapshotStatus)} for each shard
  • + *
  • Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(SnapshotId, ShardId, IndexCommit, IndexShardSnapshotStatus)} for each shard
  • *
  • When all shard calls return master calls {@link #finalizeSnapshot} * with possible list of failures
  • *
diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 3502916aae3..3850888d848 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -20,6 +20,7 @@ package org.elasticsearch.snapshots; import com.google.common.collect.ImmutableMap; +import org.apache.lucene.index.IndexCommit; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; @@ -38,7 +39,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.SnapshotFailedEngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; @@ -335,7 +335,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent { - - @Override - public boolean matchesSafely(SnapshotIndexCommit snapshotIndexCommit) { - try { - HashSet files = Sets.newHashSet(snapshotIndexCommit.getDirectory().listAll()); - for (String fileName : snapshotIndexCommit.getFiles()) { - if (files.contains(fileName) == false) { - return false; - } - } - } catch (IOException ex) { - throw new RuntimeException(ex); - } - return true; - } - - @Override - public void describeTo(Description description) { - description.appendText("an index commit existence"); - } - - public static Matcher snapshotIndexCommitExists() { - return new SnapshotIndexCommitExistsMatcher(); - } -} diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 3381102a419..c775639044a 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -60,8 +60,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.engine.Engine.Searcher; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.*; @@ -231,15 +229,10 @@ public class InternalEngineTests extends ESTestCase { return new Translog(translogConfig); } - protected IndexDeletionPolicy createIndexDeletionPolicy() { - return new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS); - } - protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() { - return new SnapshotDeletionPolicy(createIndexDeletionPolicy()); + return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); } - protected InternalEngine createEngine(Store store, Path translogPath, IndexSearcherWrapper... wrappers) { return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy(), wrappers); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 79f5e07948f..a6ca90a73db 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -40,8 +40,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.Index; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; @@ -185,15 +183,10 @@ public class ShadowEngineTests extends ESTestCase { return new Store(shardId, EMPTY_SETTINGS, directoryService, new DummyShardLock(shardId)); } - protected IndexDeletionPolicy createIndexDeletionPolicy() { - return new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS); - } - protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() { - return new SnapshotDeletionPolicy(createIndexDeletionPolicy()); + return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); } - protected ShadowEngine createShadowEngine(Store store) { return createShadowEngine(defaultSettings, store); } diff --git a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java index 5711d6a217c..18ba33ff8c2 100644 --- a/core/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/core/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -40,8 +40,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; -import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -1144,7 +1142,7 @@ public class StoreTests extends ESTestCase { DirectoryService directoryService = new LuceneManagedDirectoryService(random()); Store store = new Store(shardId, Settings.EMPTY, directoryService, new DummyShardLock(shardId)); IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec()); - SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS)); + SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); config.setIndexDeletionPolicy(deletionPolicy); IndexWriter writer = new IndexWriter(store.directory(), config); Document doc = new Document();