Remove ES internal deletion policies in favour of Lucenes implementations
These classes are really duplicates and are just here for historical reasons. We don't need these anymore since the same classes exist in lucene today. This also removes the guice injection for DeletionPolicy and make them shard private.
This commit is contained in:
parent
eaafc62f53
commit
d5d4c9b140
|
@ -40,7 +40,6 @@ import org.elasticsearch.index.aliases.IndexAliasesService;
|
|||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
|
||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
|
@ -365,8 +364,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
injector.getInstance(IndicesQueryCache.class).onClose(shardId);
|
||||
}
|
||||
}), path));
|
||||
modules.add(new DeletionPolicyModule());
|
||||
|
||||
pluginsService.processModules(modules);
|
||||
|
||||
try {
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import org.apache.lucene.index.IndexDeletionPolicy;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexShardComponent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
abstract class AbstractESDeletionPolicy extends IndexDeletionPolicy implements IndexShardComponent {
|
||||
|
||||
protected final ESLogger logger;
|
||||
|
||||
protected final ShardId shardId;
|
||||
|
||||
protected final Settings indexSettings;
|
||||
|
||||
protected AbstractESDeletionPolicy(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
this.shardId = shardId;
|
||||
this.indexSettings = indexSettings;
|
||||
this.logger = Loggers.getLogger(getClass(), indexSettings, shardId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardId shardId() {
|
||||
return this.shardId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings indexSettings() {
|
||||
return this.indexSettings;
|
||||
}
|
||||
|
||||
public String nodeName() {
|
||||
return indexSettings.get("name", "");
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import org.apache.lucene.index.IndexDeletionPolicy;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.name.Names;
|
||||
|
||||
public class DeletionPolicyModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(IndexDeletionPolicy.class)
|
||||
.annotatedWith(Names.named("actual"))
|
||||
.to(KeepOnlyLastDeletionPolicy.class)
|
||||
.asEagerSingleton();
|
||||
|
||||
bind(SnapshotDeletionPolicy.class)
|
||||
.asEagerSingleton();
|
||||
}
|
||||
}
|
|
@ -1,63 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class KeepLastNDeletionPolicy extends AbstractESDeletionPolicy {
|
||||
|
||||
private final int numToKeep;
|
||||
|
||||
@Inject
|
||||
public KeepLastNDeletionPolicy(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
super(shardId, indexSettings);
|
||||
this.numToKeep = indexSettings.getAsInt("index.deletionpolicy.num_to_keep", 5);
|
||||
logger.debug("Using [keep_last_n] deletion policy with num_to_keep[{}]", numToKeep);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInit(List<? extends IndexCommit> commits) throws IOException {
|
||||
// do no deletions on init
|
||||
doDeletes(commits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
|
||||
doDeletes(commits);
|
||||
}
|
||||
|
||||
private void doDeletes(List<? extends IndexCommit> commits) {
|
||||
int size = commits.size();
|
||||
for (int i = 0; i < size - numToKeep; i++) {
|
||||
commits.get(i).delete();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,65 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This {@link org.apache.lucene.index.IndexDeletionPolicy} implementation that
|
||||
* keeps only the most recent commit and immediately removes
|
||||
* all prior commits after a new commit is done. This is
|
||||
* the default deletion policy.
|
||||
*/
|
||||
public class KeepOnlyLastDeletionPolicy extends AbstractESDeletionPolicy {
|
||||
|
||||
@Inject
|
||||
public KeepOnlyLastDeletionPolicy(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
super(shardId, indexSettings);
|
||||
logger.debug("Using [keep_only_last] deletion policy");
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes all commits except the most recent one.
|
||||
*/
|
||||
@Override
|
||||
public void onInit(List<? extends IndexCommit> commits) {
|
||||
// Note that commits.size() should normally be 1:
|
||||
onCommit(commits);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes all commits except the most recent one.
|
||||
*/
|
||||
@Override
|
||||
public void onCommit(List<? extends IndexCommit> commits) {
|
||||
// Note that commits.size() should normally be 2 (if not
|
||||
// called by onInit above):
|
||||
int size = commits.size();
|
||||
for (int i = 0; i < size - 1; i++) {
|
||||
commits.get(i).delete();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,220 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexDeletionPolicy;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.name.Named;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.shard.IndexShardComponent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* Snapshot deletion policy allows to get snapshots of an index state (last commit or all commits)
|
||||
* and if the deletion policy is used with all open index writers (JVM level) then the snapshot
|
||||
* state will not be deleted until it will be released.
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class SnapshotDeletionPolicy extends AbstractESDeletionPolicy {
|
||||
|
||||
private final IndexDeletionPolicy primary;
|
||||
|
||||
private final ConcurrentMap<Long, SnapshotHolder> snapshots = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
private volatile List<SnapshotIndexCommit> commits;
|
||||
|
||||
private final Object mutex = new Object();
|
||||
|
||||
private SnapshotIndexCommit lastCommit;
|
||||
|
||||
/**
|
||||
* Constructs a new snapshot deletion policy that wraps the provided deletion policy.
|
||||
*/
|
||||
@Inject
|
||||
public SnapshotDeletionPolicy(@Named("actual") IndexDeletionPolicy primary) {
|
||||
super(((IndexShardComponent) primary).shardId(), ((IndexShardComponent) primary).indexSettings());
|
||||
this.primary = primary;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by Lucene. Same as {@link #onCommit(java.util.List)}.
|
||||
*/
|
||||
@Override
|
||||
public void onInit(List<? extends IndexCommit> commits) throws IOException {
|
||||
if (!commits.isEmpty()) { // this might be empty if we create a new index.
|
||||
// the behavior has changed in Lucene 4.4 that calls onInit even with an empty commits list.
|
||||
onCommit(commits);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by Lucene.. Wraps the provided commits with {@link SnapshotIndexCommit}
|
||||
* and delegates to the wrapped deletion policy.
|
||||
*/
|
||||
@Override
|
||||
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
|
||||
assert !commits.isEmpty() : "Commits must not be empty";
|
||||
synchronized (mutex) {
|
||||
List<SnapshotIndexCommit> snapshotCommits = wrapCommits(commits);
|
||||
primary.onCommit(snapshotCommits);
|
||||
|
||||
// clean snapshots that their respective counts are 0 (should not really happen)
|
||||
for (Iterator<SnapshotHolder> it = snapshots.values().iterator(); it.hasNext(); ) {
|
||||
SnapshotHolder holder = it.next();
|
||||
if (holder.counter <= 0) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
// build the current commits list (all the ones that are not deleted by the primary)
|
||||
List<SnapshotIndexCommit> newCommits = new ArrayList<>();
|
||||
for (SnapshotIndexCommit commit : snapshotCommits) {
|
||||
if (!commit.isDeleted()) {
|
||||
newCommits.add(commit);
|
||||
}
|
||||
}
|
||||
this.commits = newCommits;
|
||||
// the last commit that is not deleted
|
||||
this.lastCommit = newCommits.get(newCommits.size() - 1);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshots all the current commits in the index. Make sure to call
|
||||
* {@link SnapshotIndexCommits#close()} to release it.
|
||||
*/
|
||||
public SnapshotIndexCommits snapshots() throws IOException {
|
||||
synchronized (mutex) {
|
||||
if (snapshots == null) {
|
||||
throw new IllegalStateException("Snapshot deletion policy has not been init yet...");
|
||||
}
|
||||
List<SnapshotIndexCommit> result = new ArrayList<>(commits.size());
|
||||
for (SnapshotIndexCommit commit : commits) {
|
||||
result.add(snapshot(commit));
|
||||
}
|
||||
return new SnapshotIndexCommits(result);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a snapshot of the index (for the last commit point). Make
|
||||
* sure to call {@link SnapshotIndexCommit#close()} in order to release it.
|
||||
*/
|
||||
public SnapshotIndexCommit snapshot() throws IOException {
|
||||
synchronized (mutex) {
|
||||
if (lastCommit == null) {
|
||||
throw new IllegalStateException("Snapshot deletion policy has not been init yet...");
|
||||
}
|
||||
return snapshot(lastCommit);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexDeletionPolicy clone() {
|
||||
// Lucene IW makes a clone internally but since we hold on to this instance
|
||||
// the clone will just be the identity. See InternalEngine recovery why we need this.
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to snapshot a give commit.
|
||||
*/
|
||||
private SnapshotIndexCommit snapshot(SnapshotIndexCommit commit) throws IOException {
|
||||
SnapshotHolder snapshotHolder = snapshots.get(commit.getGeneration());
|
||||
if (snapshotHolder == null) {
|
||||
snapshotHolder = new SnapshotHolder(0);
|
||||
snapshots.put(commit.getGeneration(), snapshotHolder);
|
||||
}
|
||||
snapshotHolder.counter++;
|
||||
return new OneTimeReleaseSnapshotIndexCommit(this, commit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <tt>true</tt> if the version has been snapshotted.
|
||||
*/
|
||||
boolean isHeld(long version) {
|
||||
SnapshotDeletionPolicy.SnapshotHolder holder = snapshots.get(version);
|
||||
return holder != null && holder.counter > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the version provided. Returns <tt>true</tt> if the release was successful.
|
||||
*/
|
||||
boolean close(long version) {
|
||||
synchronized (mutex) {
|
||||
SnapshotDeletionPolicy.SnapshotHolder holder = snapshots.get(version);
|
||||
if (holder == null) {
|
||||
return false;
|
||||
}
|
||||
if (holder.counter <= 0) {
|
||||
snapshots.remove(version);
|
||||
return false;
|
||||
}
|
||||
if (--holder.counter == 0) {
|
||||
snapshots.remove(version);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A class that wraps an {@link SnapshotIndexCommit} and makes sure that release will only
|
||||
* be called once on it.
|
||||
*/
|
||||
private static class OneTimeReleaseSnapshotIndexCommit extends SnapshotIndexCommit {
|
||||
private volatile boolean released = false;
|
||||
|
||||
OneTimeReleaseSnapshotIndexCommit(SnapshotDeletionPolicy deletionPolicy, IndexCommit cp) throws IOException {
|
||||
super(deletionPolicy, cp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (released) {
|
||||
return;
|
||||
}
|
||||
released = true;
|
||||
((SnapshotIndexCommit) delegate).close();
|
||||
}
|
||||
}
|
||||
|
||||
private static class SnapshotHolder {
|
||||
int counter;
|
||||
|
||||
private SnapshotHolder(int counter) {
|
||||
this.counter = counter;
|
||||
}
|
||||
}
|
||||
|
||||
private List<SnapshotIndexCommit> wrapCommits(List<? extends IndexCommit> commits) throws IOException {
|
||||
final int count = commits.size();
|
||||
List<SnapshotIndexCommit> snapshotCommits = new ArrayList<>(count);
|
||||
for (int i = 0; i < count; i++)
|
||||
snapshotCommits.add(new SnapshotIndexCommit(this, commits.get(i)));
|
||||
return snapshotCommits;
|
||||
}
|
||||
}
|
|
@ -1,74 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lucene.IndexCommitDelegate;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* A snapshot index commit point. While this is held and {@link #close()}
|
||||
* was not called, no files will be deleted that relates to this commit point
|
||||
* ({@link #getFileNames()}).
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class SnapshotIndexCommit extends IndexCommitDelegate implements Releasable {
|
||||
|
||||
private final SnapshotDeletionPolicy deletionPolicy;
|
||||
|
||||
private final String[] files;
|
||||
|
||||
SnapshotIndexCommit(SnapshotDeletionPolicy deletionPolicy, IndexCommit cp) throws IOException {
|
||||
super(cp);
|
||||
this.deletionPolicy = deletionPolicy;
|
||||
ArrayList<String> tmpFiles = new ArrayList<>();
|
||||
for (String o : cp.getFileNames()) {
|
||||
tmpFiles.add(o);
|
||||
}
|
||||
files = tmpFiles.toArray(new String[tmpFiles.size()]);
|
||||
}
|
||||
|
||||
public String[] getFiles() {
|
||||
return files;
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the current snapshot.
|
||||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
deletionPolicy.close(getGeneration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Override the delete operation, and only actually delete it if it
|
||||
* is not held by the {@link SnapshotDeletionPolicy}.
|
||||
*/
|
||||
@Override
|
||||
public void delete() {
|
||||
if (!deletionPolicy.isHeld(getGeneration())) {
|
||||
delegate.delete();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,55 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Represents a snapshot view of several commits. Provides a way to iterate over
|
||||
* them as well as a simple method to release all of them.
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class SnapshotIndexCommits implements Iterable<SnapshotIndexCommit>, Releasable {
|
||||
|
||||
private final List<SnapshotIndexCommit> commits;
|
||||
|
||||
public SnapshotIndexCommits(List<SnapshotIndexCommit> commits) {
|
||||
this.commits = commits;
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return commits.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<SnapshotIndexCommit> iterator() {
|
||||
return commits.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(commits);
|
||||
}
|
||||
}
|
|
@ -19,17 +19,7 @@
|
|||
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.FilterLeafReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.SegmentCommitInfo;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.index.SegmentReader;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
|
@ -51,8 +41,6 @@ import org.elasticsearch.common.lucene.Lucene;
|
|||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
|
@ -516,7 +504,7 @@ public abstract class Engine implements Closeable {
|
|||
*
|
||||
* @param flushFirst indicates whether the engine should flush before returning the snapshot
|
||||
*/
|
||||
public abstract SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException;
|
||||
public abstract IndexCommit snapshotIndex(boolean flushFirst) throws EngineException;
|
||||
|
||||
/**
|
||||
* fail engine due to some error. the engine will also be closed.
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.engine;
|
|||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.apache.lucene.index.SnapshotDeletionPolicy;
|
||||
import org.apache.lucene.search.QueryCache;
|
||||
import org.apache.lucene.search.QueryCachingPolicy;
|
||||
import org.apache.lucene.search.similarities.Similarity;
|
||||
|
@ -30,7 +31,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -305,7 +305,7 @@ public final class EngineConfig {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy} used in the engines
|
||||
* Returns a {@link SnapshotDeletionPolicy} used in the engines
|
||||
* {@link org.apache.lucene.index.IndexWriter}.
|
||||
*/
|
||||
public SnapshotDeletionPolicy getDeletionPolicy() {
|
||||
|
|
|
@ -19,19 +19,8 @@
|
|||
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.LiveIndexWriterConfig;
|
||||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.apache.lucene.index.MultiReader;
|
||||
import org.apache.lucene.index.SegmentCommitInfo;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
|
@ -59,7 +48,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
|
@ -885,7 +873,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public SnapshotIndexCommit snapshotIndex(final boolean flushFirst) throws EngineException {
|
||||
public IndexCommit snapshotIndex(final boolean flushFirst) throws EngineException {
|
||||
// we have to flush outside of the readlock otherwise we might have a problem upgrading
|
||||
// the to a write lock when we fail the engine in this operation
|
||||
if (flushFirst) {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.search.SearcherFactory;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
|
@ -29,7 +30,6 @@ import org.elasticsearch.common.lucene.Lucene;
|
|||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -209,10 +209,12 @@ public class ShadowEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
|
||||
public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
|
||||
throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
protected SearcherManager getSearcherManager() {
|
||||
return searcherManager;
|
||||
|
|
|
@ -22,6 +22,9 @@ package org.elasticsearch.index.shard;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import org.apache.lucene.codecs.PostingsFormat;
|
||||
import org.apache.lucene.index.CheckIndex;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
|
||||
import org.apache.lucene.index.SnapshotDeletionPolicy;
|
||||
import org.apache.lucene.search.QueryCachingPolicy;
|
||||
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
|
@ -65,11 +68,8 @@ import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
|
|||
import org.elasticsearch.index.cache.query.QueryCacheStats;
|
||||
import org.elasticsearch.index.cache.request.ShardRequestCache;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.engine.*;
|
||||
import org.elasticsearch.index.fielddata.FieldDataStats;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.fielddata.ShardFieldData;
|
||||
import org.elasticsearch.index.flush.FlushStats;
|
||||
|
@ -206,16 +206,15 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
|
||||
IndicesQueryCache indicesQueryCache, CodecService codecService,
|
||||
TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService,
|
||||
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory,
|
||||
@Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory,
|
||||
ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
|
||||
super(shardId, indexSettingsService.getSettings());
|
||||
this.codecService = codecService;
|
||||
this.warmer = warmer;
|
||||
this.deletionPolicy = deletionPolicy;
|
||||
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
|
||||
this.similarityService = similarityService;
|
||||
this.wrappingService = wrappingService;
|
||||
Objects.requireNonNull(store, "Store must be provided to the index shard");
|
||||
Objects.requireNonNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
|
||||
this.engineFactory = factory;
|
||||
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
|
||||
this.indexSettingsService = indexSettingsService;
|
||||
|
@ -745,7 +744,13 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
return luceneVersion == null ? Version.indexCreated(indexSettings).luceneVersion : luceneVersion;
|
||||
}
|
||||
|
||||
public SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
|
||||
/**
|
||||
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
|
||||
* commit won't be freed until the commit / snapshot is released via {@link #releaseSnapshot(IndexCommit)}.
|
||||
*
|
||||
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
|
||||
*/
|
||||
public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
|
||||
IndexShardState state = this.state; // one time volatile read
|
||||
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
|
||||
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
|
||||
|
@ -755,6 +760,15 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Releases a snapshot taken from {@link #snapshotIndex(boolean)} this must be called to release the resources
|
||||
* referenced by the given snapshot {@link IndexCommit}.
|
||||
*/
|
||||
public void releaseSnapshot(IndexCommit snapshot) throws IOException {
|
||||
deletionPolicy.release(snapshot);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fails the shard and marks the shard store as corrupted if
|
||||
* <code>e</code> is caused by index corruption
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.index.IndexService;
|
|||
import org.elasticsearch.index.aliases.IndexAliasesService;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.engine.IndexSearcherWrappingService;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
|
@ -35,7 +34,6 @@ import org.elasticsearch.index.engine.EngineFactory;
|
|||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
|
||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
|
@ -64,14 +62,14 @@ public final class ShadowIndexShard extends IndexShard {
|
|||
IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache,
|
||||
CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
|
||||
IndexService indexService, @Nullable IndicesWarmer warmer,
|
||||
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
|
||||
SimilarityService similarityService,
|
||||
EngineFactory factory, ClusterService clusterService,
|
||||
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
|
||||
super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService,
|
||||
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
|
||||
indicesQueryCache, codecService,
|
||||
termVectorsService, indexFieldDataService, indexService,
|
||||
warmer, deletionPolicy, similarityService,
|
||||
warmer, similarityService,
|
||||
factory, clusterService, path, bigArrays, wrappingService);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,9 +19,9 @@
|
|||
|
||||
package org.elasticsearch.index.snapshots;
|
||||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
|
||||
|
@ -47,7 +47,7 @@ public interface IndexShardRepository {
|
|||
* @param snapshotIndexCommit commit point
|
||||
* @param snapshotStatus snapshot status
|
||||
*/
|
||||
void snapshot(SnapshotId snapshotId, ShardId shardId, SnapshotIndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
|
||||
void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
|
||||
|
||||
/**
|
||||
* Restores snapshot of the shard.
|
||||
|
|
|
@ -19,10 +19,7 @@
|
|||
|
||||
package org.elasticsearch.index.snapshots.blobstore;
|
||||
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.IndexFormatTooNewException;
|
||||
import org.apache.lucene.index.IndexFormatTooOldException;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
|
@ -49,7 +46,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.*;
|
||||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
|
||||
|
@ -150,12 +146,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
this.snapshotRateLimiter = snapshotRateLimiter;
|
||||
this.restoreRateLimiter = restoreRateLimiter;
|
||||
this.rateLimiterListener = rateLimiterListener;
|
||||
this.snapshotThrottleListener = new RateLimitingInputStream.Listener() {
|
||||
@Override
|
||||
public void onPause(long nanos) {
|
||||
rateLimiterListener.onSnapshotPause(nanos);
|
||||
}
|
||||
};
|
||||
this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
|
||||
this.compress = compress;
|
||||
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
|
||||
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
|
||||
|
@ -166,7 +157,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void snapshot(SnapshotId snapshotId, ShardId shardId, SnapshotIndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
|
||||
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
|
||||
SnapshotContext snapshotContext = new SnapshotContext(snapshotId, shardId, snapshotStatus);
|
||||
snapshotStatus.startTime(System.currentTimeMillis());
|
||||
|
||||
|
@ -495,7 +486,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) {
|
||||
super(snapshotId, Version.CURRENT, shardId);
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
store = indexService.shardInjectorSafe(shardId.id()).getInstance(Store.class);
|
||||
store = indexService.shard(shardId.id()).store();
|
||||
this.snapshotStatus = snapshotStatus;
|
||||
|
||||
}
|
||||
|
@ -505,7 +496,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
*
|
||||
* @param snapshotIndexCommit snapshot commit point
|
||||
*/
|
||||
public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
|
||||
public void snapshot(IndexCommit snapshotIndexCommit) {
|
||||
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName);
|
||||
store.incRef();
|
||||
try {
|
||||
|
@ -528,12 +519,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
ArrayList<FileInfo> filesToSnapshot = new ArrayList<>();
|
||||
final Store.MetadataSnapshot metadata;
|
||||
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
|
||||
final Collection<String> fileNames;
|
||||
try {
|
||||
metadata = store.getMetadata(snapshotIndexCommit);
|
||||
fileNames = snapshotIndexCommit.getFileNames();
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
|
||||
}
|
||||
for (String fileName : snapshotIndexCommit.getFiles()) {
|
||||
for (String fileName : fileNames) {
|
||||
if (snapshotStatus.aborted()) {
|
||||
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
|
||||
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
|
||||
|
@ -776,7 +769,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
*/
|
||||
public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
|
||||
super(snapshotId, version, shardId, snapshotShardId);
|
||||
store = indicesService.indexServiceSafe(shardId.getIndex()).shardInjectorSafe(shardId.id()).getInstance(Store.class);
|
||||
store = indicesService.indexServiceSafe(shardId.getIndex()).shard(shardId.id()).store();
|
||||
this.recoveryState = recoveryState;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.indices.recovery;
|
||||
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexFormatTooNewException;
|
||||
import org.apache.lucene.index.IndexFormatTooOldException;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
|
@ -33,14 +34,12 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
|
@ -122,7 +121,7 @@ public class RecoverySourceHandler {
|
|||
assert engine.getTranslog() != null : "translog must not be null";
|
||||
try (Translog.View translogView = engine.getTranslog().newView()) {
|
||||
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
|
||||
final SnapshotIndexCommit phase1Snapshot;
|
||||
final IndexCommit phase1Snapshot;
|
||||
try {
|
||||
phase1Snapshot = shard.snapshotIndex(false);
|
||||
} catch (Throwable e) {
|
||||
|
@ -135,7 +134,11 @@ public class RecoverySourceHandler {
|
|||
} catch (Throwable e) {
|
||||
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
|
||||
} finally {
|
||||
Releasables.closeWhileHandlingException(phase1Snapshot);
|
||||
try {
|
||||
shard.releaseSnapshot(phase1Snapshot);
|
||||
} catch (IOException ex) {
|
||||
logger.warn("releasing snapshot caused exception", ex);
|
||||
}
|
||||
}
|
||||
|
||||
logger.trace("snapshot translog for recovery. current size is [{}]", translogView.totalOperations());
|
||||
|
@ -151,7 +154,7 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
|
||||
/**
|
||||
* Perform phase1 of the recovery operations. Once this {@link SnapshotIndexCommit}
|
||||
* Perform phase1 of the recovery operations. Once this {@link IndexCommit}
|
||||
* snapshot has been performed no commit operations (files being fsync'd)
|
||||
* are effectively allowed on this index until all recovery phases are done
|
||||
* <p>
|
||||
|
@ -159,7 +162,7 @@ public class RecoverySourceHandler {
|
|||
* segments that are missing. Only segments that have the same size and
|
||||
* checksum can be reused
|
||||
*/
|
||||
public void phase1(final SnapshotIndexCommit snapshot, final Translog.View translogView) {
|
||||
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
|
||||
cancellableThreads.checkForCancel();
|
||||
// Total size of segment files that are recovered
|
||||
long totalSize = 0;
|
||||
|
@ -176,7 +179,7 @@ public class RecoverySourceHandler {
|
|||
shard.engine().failEngine("recovery", ex);
|
||||
throw ex;
|
||||
}
|
||||
for (String name : snapshot.getFiles()) {
|
||||
for (String name : snapshot.getFileNames()) {
|
||||
final StoreFileMetaData md = recoverySourceMetadata.get(name);
|
||||
if (md == null) {
|
||||
logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap());
|
||||
|
|
|
@ -18,9 +18,12 @@
|
|||
*/
|
||||
package org.elasticsearch.repositories;
|
||||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
|
||||
|
@ -38,7 +41,7 @@ import java.util.List;
|
|||
* <ul>
|
||||
* <li>Master calls {@link #initializeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
|
||||
* with list of indices that will be included into the snapshot</li>
|
||||
* <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(org.elasticsearch.cluster.metadata.SnapshotId, org.elasticsearch.index.shard.ShardId, org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit, org.elasticsearch.index.snapshots.IndexShardSnapshotStatus)} for each shard</li>
|
||||
* <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(SnapshotId, ShardId, IndexCommit, IndexShardSnapshotStatus)} for each shard</li>
|
||||
* <li>When all shard calls return master calls {@link #finalizeSnapshot}
|
||||
* with possible list of failures</li>
|
||||
* </ul>
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.snapshots;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
|
@ -38,7 +39,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
|
@ -335,7 +335,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
|
|||
|
||||
try {
|
||||
// we flush first to make sure we get the latest writes snapshotted
|
||||
SnapshotIndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
|
||||
IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
|
||||
try {
|
||||
indexShardRepository.snapshot(snapshotId, shardId, snapshotIndexCommit, snapshotStatus);
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
@ -345,7 +345,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
|
|||
logger.debug(sb.toString());
|
||||
}
|
||||
} finally {
|
||||
snapshotIndexCommit.close();
|
||||
indexShard.releaseSnapshot(snapshotIndexCommit);
|
||||
}
|
||||
} catch (SnapshotFailedEngineException e) {
|
||||
throw e;
|
||||
|
|
|
@ -1,180 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.lucene.index.DirectoryReader.listCommits;
|
||||
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
* A set of tests for {@link org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy}.
|
||||
*/
|
||||
public class SnapshotDeletionPolicyTests extends ESTestCase {
|
||||
|
||||
protected final ShardId shardId = new ShardId(new Index("index"), 1);
|
||||
|
||||
private RAMDirectory dir;
|
||||
private SnapshotDeletionPolicy deletionPolicy;
|
||||
private IndexWriter indexWriter;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
dir = new RAMDirectory();
|
||||
deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS));
|
||||
indexWriter = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
|
||||
.setIndexDeletionPolicy(deletionPolicy)
|
||||
.setOpenMode(IndexWriterConfig.OpenMode.CREATE));
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
indexWriter.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
private Document testDocument() {
|
||||
Document document = new Document();
|
||||
document.add(new TextField("test", "1", Field.Store.YES));
|
||||
return document;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSnapshot() throws Exception {
|
||||
// add a document and commit, resulting in one commit point
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
|
||||
assertThat(listCommits(dir).size(), equalTo(1));
|
||||
|
||||
// add another document and commit, resulting again in one commit point
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(1));
|
||||
|
||||
// snapshot the last commit, and then add a document and commit, now we should have two commit points
|
||||
SnapshotIndexCommit snapshot = deletionPolicy.snapshot();
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(2));
|
||||
|
||||
// release the commit, add a document and commit, now we should be back to one commit point
|
||||
snapshot.close();
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiSnapshot() throws Exception {
|
||||
// add a document and commit, resulting in one commit point
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(1));
|
||||
|
||||
// take two snapshots
|
||||
SnapshotIndexCommit snapshot1 = deletionPolicy.snapshot();
|
||||
SnapshotIndexCommit snapshot2 = deletionPolicy.snapshot();
|
||||
|
||||
// we should have two commits points
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(2));
|
||||
|
||||
// release one snapshot, we should still have two commit points
|
||||
snapshot1.close();
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(2));
|
||||
|
||||
// release the second snapshot, we should be back to one commit
|
||||
snapshot2.close();
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiReleaseException() throws Exception {
|
||||
// add a document and commit, resulting in one commit point
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(1));
|
||||
|
||||
// snapshot the last commit, and release it twice, the seconds should throw an exception
|
||||
SnapshotIndexCommit snapshot = deletionPolicy.snapshot();
|
||||
snapshot.close();
|
||||
snapshot.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSnapshots() throws Exception {
|
||||
// add a document and commit, resulting in one commit point
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(1));
|
||||
|
||||
// add another document and commit, resulting again in one commint point
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(1));
|
||||
|
||||
// snapshot the last commit, and then add a document and commit, now we should have two commit points
|
||||
SnapshotIndexCommit snapshot = deletionPolicy.snapshot();
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(2));
|
||||
|
||||
// now, take a snapshot of all the commits
|
||||
SnapshotIndexCommits snapshots = deletionPolicy.snapshots();
|
||||
assertThat(snapshots.size(), equalTo(2));
|
||||
|
||||
// release the snapshot, add a document and commit
|
||||
// we should have 3 commits points since we are holding onto the first two with snapshots
|
||||
// and we are using the keep only last
|
||||
snapshot.close();
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(3));
|
||||
|
||||
// now release the snapshots, we should be back to a single commit point
|
||||
snapshots.close();
|
||||
indexWriter.addDocument(testDocument());
|
||||
indexWriter.commit();
|
||||
assertThat(listCommits(dir).size(), equalTo(1));
|
||||
}
|
||||
}
|
|
@ -1,58 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.TypeSafeMatcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class SnapshotIndexCommitExistsMatcher extends TypeSafeMatcher<SnapshotIndexCommit> {
|
||||
|
||||
@Override
|
||||
public boolean matchesSafely(SnapshotIndexCommit snapshotIndexCommit) {
|
||||
try {
|
||||
HashSet<String> files = Sets.newHashSet(snapshotIndexCommit.getDirectory().listAll());
|
||||
for (String fileName : snapshotIndexCommit.getFiles()) {
|
||||
if (files.contains(fileName) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
description.appendText("an index commit existence");
|
||||
}
|
||||
|
||||
public static Matcher<SnapshotIndexCommit> snapshotIndexCommitExists() {
|
||||
return new SnapshotIndexCommitExistsMatcher();
|
||||
}
|
||||
}
|
|
@ -60,8 +60,6 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.engine.Engine.Searcher;
|
||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||
import org.elasticsearch.index.mapper.*;
|
||||
|
@ -231,15 +229,10 @@ public class InternalEngineTests extends ESTestCase {
|
|||
return new Translog(translogConfig);
|
||||
}
|
||||
|
||||
protected IndexDeletionPolicy createIndexDeletionPolicy() {
|
||||
return new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS);
|
||||
}
|
||||
|
||||
protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
|
||||
return new SnapshotDeletionPolicy(createIndexDeletionPolicy());
|
||||
return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
|
||||
}
|
||||
|
||||
|
||||
protected InternalEngine createEngine(Store store, Path translogPath, IndexSearcherWrapper... wrappers) {
|
||||
return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy(), wrappers);
|
||||
}
|
||||
|
|
|
@ -40,8 +40,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
|
@ -185,15 +183,10 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
return new Store(shardId, EMPTY_SETTINGS, directoryService, new DummyShardLock(shardId));
|
||||
}
|
||||
|
||||
protected IndexDeletionPolicy createIndexDeletionPolicy() {
|
||||
return new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS);
|
||||
}
|
||||
|
||||
protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
|
||||
return new SnapshotDeletionPolicy(createIndexDeletionPolicy());
|
||||
return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
|
||||
}
|
||||
|
||||
|
||||
protected ShadowEngine createShadowEngine(Store store) {
|
||||
return createShadowEngine(defaultSettings, store);
|
||||
}
|
||||
|
|
|
@ -40,8 +40,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
@ -1144,7 +1142,7 @@ public class StoreTests extends ESTestCase {
|
|||
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
|
||||
Store store = new Store(shardId, Settings.EMPTY, directoryService, new DummyShardLock(shardId));
|
||||
IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec());
|
||||
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS));
|
||||
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
|
||||
config.setIndexDeletionPolicy(deletionPolicy);
|
||||
IndexWriter writer = new IndexWriter(store.directory(), config);
|
||||
Document doc = new Document();
|
||||
|
|
Loading…
Reference in New Issue