* none of this is used anywhere
This commit is contained in:
parent
dc26521b0f
commit
f5efafd4d6
|
@ -362,7 +362,6 @@ final class StoreRecovery {
|
|||
final RecoveryState recoveryState = indexShard.recoveryState();
|
||||
final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
|
||||
indexShard.prepareForIndexRecovery();
|
||||
long version = -1;
|
||||
SegmentInfos si = null;
|
||||
final Store store = indexShard.store();
|
||||
store.incRef();
|
||||
|
@ -384,21 +383,16 @@ final class StoreRecovery {
|
|||
"shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e);
|
||||
}
|
||||
}
|
||||
if (si != null) {
|
||||
if (indexShouldExists) {
|
||||
version = si.getVersion();
|
||||
} else {
|
||||
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
|
||||
// its a "new index create" API, we have to do something, so better to clean it than use same data
|
||||
logger.trace("cleaning existing shard, shouldn't exists");
|
||||
Lucene.cleanLuceneIndex(store.directory());
|
||||
si = null;
|
||||
}
|
||||
if (si != null && indexShouldExists == false) {
|
||||
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
|
||||
// its a "new index create" API, we have to do something, so better to clean it than use same data
|
||||
logger.trace("cleaning existing shard, shouldn't exists");
|
||||
Lucene.cleanLuceneIndex(store.directory());
|
||||
si = null;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
|
||||
}
|
||||
recoveryState.getIndex().updateVersion(version);
|
||||
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
|
||||
assert indexShouldExists;
|
||||
bootstrap(indexShard, store);
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.elasticsearch.transport.TransportChannel;
|
|||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -83,7 +82,7 @@ public class PeerRecoverySourceService implements IndexEventListener {
|
|||
}
|
||||
}
|
||||
|
||||
private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) throws IOException {
|
||||
private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) {
|
||||
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
|
||||
final IndexShard shard = indexService.getShard(request.shardId().id());
|
||||
|
||||
|
|
|
@ -41,10 +41,6 @@ public class RecoveryFailedException extends ElasticsearchException {
|
|||
this(state.getShardId(), state.getSourceNode(), state.getTargetNode(), extraInfo, cause);
|
||||
}
|
||||
|
||||
public RecoveryFailedException(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, Throwable cause) {
|
||||
this(shardId, sourceNode, targetNode, null, cause);
|
||||
}
|
||||
|
||||
public RecoveryFailedException(ShardId shardId,
|
||||
DiscoveryNode sourceNode,
|
||||
DiscoveryNode targetNode,
|
||||
|
|
|
@ -86,10 +86,6 @@ public final class RecoveryFileChunkRequest extends TransportRequest {
|
|||
return position;
|
||||
}
|
||||
|
||||
public String checksum() {
|
||||
return metaData.checksum();
|
||||
}
|
||||
|
||||
public long length() {
|
||||
return metaData.length();
|
||||
}
|
||||
|
|
|
@ -572,9 +572,6 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
|||
private long recovered;
|
||||
private boolean reused;
|
||||
|
||||
public File() {
|
||||
}
|
||||
|
||||
public File(String name, long length, boolean reused) {
|
||||
assert name != null;
|
||||
this.name = name;
|
||||
|
@ -672,11 +669,10 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
|||
|
||||
public static class Index extends Timer implements ToXContentFragment, Writeable {
|
||||
|
||||
private Map<String, File> fileDetails = new HashMap<>();
|
||||
private final Map<String, File> fileDetails = new HashMap<>();
|
||||
|
||||
public static final long UNKNOWN = -1L;
|
||||
|
||||
private long version = UNKNOWN;
|
||||
private long sourceThrottlingInNanos = UNKNOWN;
|
||||
private long targetThrottleTimeInNanos = UNKNOWN;
|
||||
|
||||
|
@ -712,7 +708,6 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
|||
|
||||
public synchronized void reset() {
|
||||
super.reset();
|
||||
version = UNKNOWN;
|
||||
fileDetails.clear();
|
||||
sourceThrottlingInNanos = UNKNOWN;
|
||||
targetThrottleTimeInNanos = UNKNOWN;
|
||||
|
@ -729,10 +724,6 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
|||
file.addRecoveredBytes(bytes);
|
||||
}
|
||||
|
||||
public synchronized long version() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
public synchronized void addSourceThrottling(long timeInNanos) {
|
||||
if (sourceThrottlingInNanos == UNKNOWN) {
|
||||
sourceThrottlingInNanos = timeInNanos;
|
||||
|
@ -851,16 +842,6 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
|||
return total;
|
||||
}
|
||||
|
||||
public synchronized long totalReuseBytes() {
|
||||
long total = 0;
|
||||
for (File file : fileDetails.values()) {
|
||||
if (file.reused()) {
|
||||
total += file.length();
|
||||
}
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* percent of bytes recovered out of total files bytes *to be* recovered
|
||||
*/
|
||||
|
@ -904,10 +885,6 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
|||
return reused;
|
||||
}
|
||||
|
||||
public synchronized void updateVersion(long version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
// stream size first, as it matters more and the files section can be long
|
||||
|
|
|
@ -160,10 +160,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
|
|||
return store;
|
||||
}
|
||||
|
||||
public RecoveryState.Stage stage() {
|
||||
return state().getStage();
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the current recovery target and waits up to a certain timeout for resources to be freed.
|
||||
* Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done.
|
||||
|
|
|
@ -1,66 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.indices.recovery;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RecoveryWaitForClusterStateRequest extends TransportRequest {
|
||||
|
||||
private long recoveryId;
|
||||
private ShardId shardId;
|
||||
private long clusterStateVersion;
|
||||
|
||||
public RecoveryWaitForClusterStateRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
recoveryId = in.readLong();
|
||||
shardId = new ShardId(in);
|
||||
clusterStateVersion = in.readVLong();
|
||||
}
|
||||
|
||||
RecoveryWaitForClusterStateRequest(long recoveryId, ShardId shardId, long clusterStateVersion) {
|
||||
this.recoveryId = recoveryId;
|
||||
this.shardId = shardId;
|
||||
this.clusterStateVersion = clusterStateVersion;
|
||||
}
|
||||
|
||||
public long recoveryId() {
|
||||
return this.recoveryId;
|
||||
}
|
||||
|
||||
public ShardId shardId() {
|
||||
return shardId;
|
||||
}
|
||||
|
||||
public long clusterStateVersion() {
|
||||
return clusterStateVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeLong(recoveryId);
|
||||
shardId.writeTo(out);
|
||||
out.writeVLong(clusterStateVersion);
|
||||
}
|
||||
}
|
|
@ -90,7 +90,7 @@ public class IndicesStore implements ClusterStateListener, Closeable {
|
|||
// Cache successful shard deletion checks to prevent unnecessary file system lookups
|
||||
private final Set<ShardId> folderNotFoundCache = new HashSet<>();
|
||||
|
||||
private TimeValue deleteShardTimeout;
|
||||
private final TimeValue deleteShardTimeout;
|
||||
|
||||
@Inject
|
||||
public IndicesStore(Settings settings, IndicesService indicesService,
|
||||
|
@ -346,10 +346,7 @@ public class IndicesStore implements ClusterStateListener, Closeable {
|
|||
public void sendResult(boolean shardActive) {
|
||||
try {
|
||||
channel.sendResponse(new ShardActiveResponse(shardActive, clusterService.localNode()));
|
||||
} catch (IOException e) {
|
||||
logger.error(() -> new ParameterizedMessage("failed send response for shard active while trying to " +
|
||||
"delete shard {} - shard will probably not be removed", request.shardId), e);
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
} catch (IOException | EsRejectedExecutionException e) {
|
||||
logger.error(() -> new ParameterizedMessage("failed send response for shard active while trying to " +
|
||||
"delete shard {} - shard will probably not be removed", request.shardId), e);
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.IndexFormatTooNewException;
|
||||
import org.apache.lucene.index.IndexFormatTooOldException;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
|
@ -184,13 +183,11 @@ public abstract class FileRestoreContext {
|
|||
}
|
||||
|
||||
// read the snapshot data persisted
|
||||
final SegmentInfos segmentCommitInfos;
|
||||
try {
|
||||
segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
|
||||
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
|
||||
}
|
||||
recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion());
|
||||
|
||||
/// now, go over and clean files that are in the store, but were not in the snapshot
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue