Recovery: be more resilient to partial network partitions

This commits adds a test that simulate disconnecting nodes and dropping requests during the various stages of recovery and solves all the issues that were raised by it. In short:

1) On going recoveries will be scheduled for retry upon network disconnect. The default retry period is 5s (cross node connections are checked every 10s by default).
2) Sometimes the disconnect happens after the target engine has started (but the shard is still in recovery). For simplicity, I opted to restart the recovery from scratch (where little to no files will be copied again, because there were just synced).
3) To protected against dropped requests, a Recovery Monitor was added that fails a recovery if no progress has been made in the last 30m (by default), which is equivalent to the long time outs we use in recovery requests.
4) When a shard fails on a node, we try to assign it to another node. If no such node is available, the shard will remain unassigned, causing the target node to clean any in memory state for it (files on disk remain). At the moment the shard will remain unassigned until another cluster state change happens, which will re-assigned it to the node in question but if no such change happens the shard will remain stuck at unassigned. The commits adds an extra delayed reroute in such cases to make sure the shard will be reassinged
5) Moved all recovery related settings to the RecoverySettings.

Closes #8720
This commit is contained in:
Boaz Leskes 2014-11-24 13:40:26 +02:00
parent dd8000c865
commit 102e7adfad
19 changed files with 624 additions and 116 deletions

View File

@ -30,17 +30,16 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.monitor.jvm.JvmInfo;
import java.io.IOException;
import java.io.Serializable;
/**
*/
@SuppressWarnings("deprecation")
public class Version implements Serializable {
public class Version {
// The logic for ID is: XXYYZZAA, where XX is major version, YY is minor version, ZZ is revision, and AA is Beta/RC indicator
// AA values below 50 are beta builds, and below 99 are RC builds, with 99 indicating a release
// the (internal) format of the id is there so we can easily do after/before checks on the id
// NOTE: indexes created with 3.6 use this constant for e.g. analysis chain emulation (imperfect)
public static final org.apache.lucene.util.Version LUCENE_3_EMULATION_VERSION = org.apache.lucene.util.Version.LUCENE_4_0_0;
@ -423,6 +422,7 @@ public class Version implements Serializable {
/**
* Return the {@link Version} of Elasticsearch that has been used to create an index given its settings.
*
* @throws ElasticsearchIllegalStateException if the given index settings doesn't contain a value for the key {@value IndexMetaData#SETTING_VERSION_CREATED}
*/
public static Version indexCreated(Settings indexSettings) {
@ -485,7 +485,7 @@ public class Version implements Serializable {
}
return versionFromId;
} catch(NumberFormatException e) {
} catch (NumberFormatException e) {
throw new IllegalArgumentException("unable to parse version " + version, e);
}
}

View File

@ -23,13 +23,11 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
@ -60,19 +58,19 @@ public class ShardStateAction extends AbstractComponent {
private final TransportService transportService;
private final ClusterService clusterService;
private final AllocationService allocationService;
private final ThreadPool threadPool;
private final RoutingService routingService;
private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
private final BlockingQueue<ShardRoutingEntry> failedShardQueue = ConcurrentCollections.newBlockingQueue();
@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
AllocationService allocationService, ThreadPool threadPool) {
AllocationService allocationService, RoutingService routingService) {
super(settings);
this.clusterService = clusterService;
this.transportService = transportService;
this.allocationService = allocationService;
this.threadPool = threadPool;
this.routingService = routingService;
transportService.registerHandler(SHARD_STARTED_ACTION_NAME, new ShardStartedTransportHandler());
transportService.registerHandler(SHARD_FAILED_ACTION_NAME, new ShardFailedTransportHandler());
@ -104,11 +102,11 @@ public class ShardStateAction extends AbstractComponent {
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}
}
@ -132,18 +130,19 @@ public class ShardStateAction extends AbstractComponent {
} else {
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}
}
private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (shardRoutingEntry.processed) {
@ -191,6 +190,14 @@ public class ShardStateAction extends AbstractComponent {
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState != newState && newState.getRoutingNodes().hasUnassigned()) {
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
routingService.scheduleReroute();
}
}
});
}

View File

@ -25,9 +25,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -113,7 +111,8 @@ public class DiscoveryNode implements Streamable, Serializable {
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
* </p>
* @param nodeId the nodes unique id.
*
* @param nodeId the nodes unique id.
* @param address the nodes transport address
* @param version the version of the node.
*/
@ -129,11 +128,12 @@ public class DiscoveryNode implements Streamable, Serializable {
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
* </p>
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param address the nodes transport address
*
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param address the nodes transport address
* @param attributes node attributes
* @param version the version of the node.
* @param version the version of the node.
*/
public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map<String, String> attributes, Version version) {
this(nodeName, nodeId, NetworkUtils.getLocalHostName(""), NetworkUtils.getLocalHostAddress(""), address, attributes, version);
@ -147,13 +147,14 @@ public class DiscoveryNode implements Streamable, Serializable {
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
* </p>
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param hostName the nodes hostname
*
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param hostName the nodes hostname
* @param hostAddress the nodes host address
* @param address the nodes transport address
* @param attributes node attributes
* @param version the version of the node.
* @param address the nodes transport address
* @param attributes node attributes
* @param version the version of the node.
*/
public DiscoveryNode(String nodeName, String nodeId, String hostName, String hostAddress, TransportAddress address, Map<String, String> attributes, Version version) {
if (nodeName != null) {
@ -340,8 +341,9 @@ public class DiscoveryNode implements Streamable, Serializable {
@Override
public boolean equals(Object obj) {
if (!(obj instanceof DiscoveryNode))
if (!(obj instanceof DiscoveryNode)) {
return false;
}
DiscoveryNode other = (DiscoveryNode) obj;
return this.nodeId.equals(other.nodeId);
@ -372,4 +374,19 @@ public class DiscoveryNode implements Streamable, Serializable {
}
return sb.toString();
}
// we need this custom serialization logic because Version is not serializable (because org.apache.lucene.util.Version is not serializable)
private void writeObject(java.io.ObjectOutputStream out)
throws IOException {
StreamOutput streamOutput = new OutputStreamStreamOutput(out);
streamOutput.setVersion(Version.CURRENT.minimumCompatibilityVersion());
this.writeTo(streamOutput);
}
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
StreamInput streamInput = new InputStreamStreamInput(in);
streamInput.setVersion(Version.CURRENT.minimumCompatibilityVersion());
this.readFrom(streamInput);
}
}

View File

@ -89,6 +89,11 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
clusterService.remove(this);
}
/** make sure that a reroute will be done by the next scheduled check */
public void scheduleReroute() {
routingTableDirty = true;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) {
@ -153,8 +158,12 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
@Override
public void onFailure(String source, Throwable t) {
ClusterState state = clusterService.state();
ClusterState state = clusterService.state();
if (logger.isTraceEnabled()) {
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
} else {
logger.error("unexpected failure during [{}], current state version [{}]", t, source, state.version());
}
}
});
routingTableDirty = false;

View File

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.*;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
@ -77,7 +76,11 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*");
clusterDynamicSettings.addDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER);

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.shard;
import com.google.common.base.Charsets;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.search.Filter;
@ -51,6 +50,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
@ -72,11 +72,7 @@ import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
@ -88,7 +84,6 @@ import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
@ -102,9 +97,9 @@ import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
@ -720,6 +715,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
createNewEngine();
}
/** called if recovery has to be restarted after network error / delay ** */
public void performRecoveryRestart() throws IOException {
synchronized (mutex) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
final Engine engine = this.currentEngineReference.getAndSet(null);
IOUtils.close(engine);
}
}
/**
* The peer recovery state if this shard recovered from a peer shard, null o.w.
*/

View File

@ -19,13 +19,17 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@ -42,9 +46,11 @@ public class RecoveriesCollection {
private final ConcurrentMap<Long, RecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
final private ESLogger logger;
final private ThreadPool threadPool;
public RecoveriesCollection(ESLogger logger) {
public RecoveriesCollection(ESLogger logger, ThreadPool threadPool) {
this.logger = logger;
this.threadPool = threadPool;
}
/**
@ -52,11 +58,14 @@ public class RecoveriesCollection {
*
* @return the id of the new recovery.
*/
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) {
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state,
RecoveryTarget.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryStatus status = new RecoveryStatus(indexShard, sourceNode, state, listener);
RecoveryStatus existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status);
assert existingStatus == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId());
threadPool.schedule(activityTimeout, ThreadPool.Names.GENERIC,
new RecoveryMonitor(status.recoveryId(), status.lastAccessTime(), activityTimeout));
return status.recoveryId();
}
@ -180,5 +189,45 @@ public class RecoveriesCollection {
return status;
}
}
private class RecoveryMonitor extends AbstractRunnable {
private final long recoveryId;
private final TimeValue checkInterval;
private long lastSeenAccessTime;
private RecoveryMonitor(long recoveryId, long lastSeenAccessTime, TimeValue checkInterval) {
this.recoveryId = recoveryId;
this.checkInterval = checkInterval;
this.lastSeenAccessTime = lastSeenAccessTime;
}
@Override
public void onFailure(Throwable t) {
logger.error("unexpected error while monitoring recovery [{}]", t, recoveryId);
}
@Override
protected void doRun() throws Exception {
RecoveryStatus status = onGoingRecoveries.get(recoveryId);
if (status == null) {
logger.trace("[monitor] no status found for [{}], shutting down", recoveryId);
return;
}
long accessTime = status.lastAccessTime();
if (accessTime == lastSeenAccessTime) {
String message = "no activity after [" + checkInterval + "]";
failRecovery(recoveryId,
new RecoveryFailedException(status.state(), message, new ElasticsearchTimeoutException(message)),
true // to be safe, we don't know what go stuck
);
return;
}
lastSeenAccessTime = accessTime;
logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", lastSeenAccessTime);
threadPool.schedule(checkInterval, ThreadPool.Names.GENERIC, this);
}
}
}

View File

@ -47,7 +47,31 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams";
public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams";
public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec";
public static final String INDICES_RECOVERY_RETRY_DELAY = "indices.recovery.retry_delay";
/**
* how long to wait before retrying after issues cause by cluster state syncing between nodes
* i.e., local node is not yet known on remote node, remote shard not yet started etc.
*/
public static final String INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC = "indices.recovery.retry_delay_state_sync";
/** how long to wait before retrying after network related issues */
public static final String INDICES_RECOVERY_RETRY_DELAY_NETWORK = "indices.recovery.retry_delay_network";
/**
* recoveries that don't show any activity for more then this interval will be failed.
* defaults to `indices.recovery.internal_action_long_timeout`
*/
public static final String INDICES_RECOVERY_ACTIVITY_TIMEOUT = "indices.recovery.recovery_activity_timeout";
/** timeout value to use for requests made as part of the recovery process */
public static final String INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT = "indices.recovery.internal_action_timeout";
/**
* timeout value to use for requests made as part of the recovery process that are expected to take long time.
* defaults to twice `indices.recovery.internal_action_timeout`.
*/
public static final String INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT = "indices.recovery.internal_action_long_timeout";
public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb").bytes();
@ -70,17 +94,35 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
private volatile ByteSizeValue maxBytesPerSec;
private volatile SimpleRateLimiter rateLimiter;
private volatile TimeValue retryDelay;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
private volatile TimeValue activityTimeout;
private volatile TimeValue internalActionTimeout;
private volatile TimeValue internalActionLongTimeout;
@Inject
public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", settings.getAsBytesSize("index.shard.recovery.file_chunk_size", new ByteSizeValue(512, ByteSizeUnit.KB)));
this.translogOps = componentSettings.getAsInt("translog_ops", settings.getAsInt("index.shard.recovery.translog_ops", 1000));
this.translogSize = componentSettings.getAsBytesSize("translog_size", settings.getAsBytesSize("index.shard.recovery.translog_size", new ByteSizeValue(512, ByteSizeUnit.KB)));
this.compress = componentSettings.getAsBoolean("compress", true);
this.retryDelay = componentSettings.getAsTime("retry_delay", TimeValue.timeValueMillis(500));
this.fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, settings.getAsBytesSize("index.shard.recovery.file_chunk_size", new ByteSizeValue(512, ByteSizeUnit.KB)));
this.translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, settings.getAsInt("index.shard.recovery.translog_ops", 1000));
this.translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, settings.getAsBytesSize("index.shard.recovery.translog_size", new ByteSizeValue(512, ByteSizeUnit.KB)));
this.compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, true);
this.retryDelayStateSync = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(500));
// doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
// and we want to give the master time to remove a faulty node
this.retryDelayNetwork = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY_NETWORK, TimeValue.timeValueSeconds(5));
this.internalActionTimeout = settings.getAsTime(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, TimeValue.timeValueMinutes(15));
this.internalActionLongTimeout = settings.getAsTime(INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, new TimeValue(internalActionTimeout.millis() * 2));
this.activityTimeout = settings.getAsTime(INDICES_RECOVERY_ACTIVITY_TIMEOUT,
// default to the internalActionLongTimeout used as timeouts on RecoverySource
internalActionLongTimeout
);
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3));
this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
@ -136,7 +178,26 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
return rateLimiter;
}
public TimeValue retryDelay() { return retryDelay; }
public TimeValue retryDelayNetwork() {
return retryDelayNetwork;
}
public TimeValue retryDelayStateSync() {
return retryDelayStateSync;
}
public TimeValue activityTimeout() {
return activityTimeout;
}
public TimeValue internalActionTimeout() {
return internalActionTimeout;
}
public TimeValue internalActionLongTimeout() {
return internalActionLongTimeout;
}
class ApplySettings implements NodeSettingsService.Listener {
@Override
@ -191,11 +252,21 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
RecoverySettings.this.concurrentSmallFileStreams = concurrentSmallFileStreams;
RecoverySettings.this.concurrentSmallFileStreamPool.setMaximumPoolSize(concurrentSmallFileStreams);
}
final TimeValue retryDelay = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY, RecoverySettings.this.retryDelay);
if (retryDelay.equals(RecoverySettings.this.retryDelay) == false) {
logger.info("updating [] from [{}] to [{}]",INDICES_RECOVERY_RETRY_DELAY, RecoverySettings.this.retryDelay, retryDelay);
RecoverySettings.this.retryDelay = retryDelay;
RecoverySettings.this.retryDelayNetwork = maybeUpdate(RecoverySettings.this.retryDelayNetwork, settings, INDICES_RECOVERY_RETRY_DELAY_NETWORK);
RecoverySettings.this.retryDelayStateSync = maybeUpdate(RecoverySettings.this.retryDelayStateSync, settings, INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC);
RecoverySettings.this.activityTimeout = maybeUpdate(RecoverySettings.this.activityTimeout, settings, INDICES_RECOVERY_ACTIVITY_TIMEOUT);
RecoverySettings.this.internalActionTimeout = maybeUpdate(RecoverySettings.this.internalActionTimeout, settings, INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT);
RecoverySettings.this.internalActionLongTimeout = maybeUpdate(RecoverySettings.this.internalActionLongTimeout, settings, INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT);
}
private TimeValue maybeUpdate(final TimeValue currentValue, final Settings settings, final String key) {
final TimeValue value = settings.getAsTime(key, currentValue);
if (value.equals(currentValue)) {
return currentValue;
}
logger.info("updating [] from [{}] to [{}]", key, currentValue, value);
return value;
}
}
}

View File

@ -58,8 +58,6 @@ public class RecoverySource extends AbstractComponent {
private final ClusterService clusterService;
private final TimeValue internalActionTimeout;
private final TimeValue internalActionLongTimeout;
private final OngoingRecoveres ongoingRecoveries = new OngoingRecoveres();
@ -83,8 +81,6 @@ public class RecoverySource extends AbstractComponent {
this.recoverySettings = recoverySettings;
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
this.internalActionTimeout = componentSettings.getAsTime("internal_action_timeout", TimeValue.timeValueMinutes(15));
this.internalActionLongTimeout = new TimeValue(internalActionTimeout.millis() * 2);
}
private RecoveryResponse recover(final StartRecoveryRequest request) {
@ -117,8 +113,7 @@ public class RecoverySource extends AbstractComponent {
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, internalActionTimeout,
internalActionLongTimeout, clusterService, indicesService, mappingUpdatedAction, logger);
final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
ongoingRecoveries.add(shard, handler);
try {
shard.recover(handler);

View File

@ -70,7 +70,11 @@ public class RecoveryStatus extends AbstractRefCounted {
private final CancellableThreads cancellableThreads = new CancellableThreads();
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();
public RecoveryStatus(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) {
super("recovery_status");
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
@ -113,6 +117,16 @@ public class RecoveryStatus extends AbstractRefCounted {
return cancellableThreads;
}
/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
return lastAccessTime;
}
/** sets the lasAccessTime flag to now */
public void setLastAccessTime() {
lastAccessTime = System.nanoTime();
}
public Store store() {
ensureRefCount();
return store;
@ -219,33 +233,42 @@ public class RecoveryStatus extends AbstractRefCounted {
return indexOutput;
}
public void resetRecovery() throws IOException {
cleanOpenFiles();
indexShard().performRecoveryRestart();
}
@Override
protected void closeInternal() {
try {
// clean open index outputs
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Throwable t) {
logger.debug("error while closing recovery output [{}]", t, entry.getValue());
}
iterator.remove();
}
// trash temporary files
for (String file : tempFileNames.keySet()) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
legacyChecksums.clear();
cleanOpenFiles();
} finally {
// free store. increment happens in constructor
store.decRef();
}
}
protected void cleanOpenFiles() {
// clean open index outputs
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Throwable t) {
logger.debug("error while closing recovery output [{}]", t, entry.getValue());
}
iterator.remove();
}
// trash temporary files
for (String file : tempFileNames.keySet()) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
legacyChecksums.clear();
}
@Override
public String toString() {
return shardId + " [" + recoveryId + "]";

View File

@ -46,6 +46,7 @@ import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@ -87,7 +88,7 @@ public class RecoveryTarget extends AbstractComponent {
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger);
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
transportService.registerHandler(Actions.FILES_INFO, new FilesInfoRequestHandler());
transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler());
@ -136,14 +137,19 @@ public class RecoveryTarget extends AbstractComponent {
recoveryState.setSourceNode(sourceNode);
recoveryState.setTargetNode(clusterService.localNode());
recoveryState.setPrimary(indexShard.routingEntry().primary());
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, recoveryState, listener);
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, recoveryState, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}
protected void retryRecovery(final long recoveryId, TimeValue retryAfter) {
logger.trace("will retrying recovery with id [{}] in [{}]", recoveryId, retryAfter);
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId));
protected void retryRecovery(final RecoveryStatus recoveryStatus, final String reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
logger.trace("will retrying recovery with id [{}] in [{}] (reason [{}])", recoveryStatus.recoveryId(), retryAfter, reason);
try {
recoveryStatus.resetRecovery();
} catch (IOException e) {
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(currentRequest, e), true);
}
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryStatus.recoveryId()));
}
private void doRecovery(final RecoveryStatus recoveryStatus) {
@ -204,6 +210,7 @@ public class RecoveryTarget extends AbstractComponent {
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace("recovery cancelled", e);
} catch (Throwable e) {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
}
@ -223,17 +230,18 @@ public class RecoveryTarget extends AbstractComponent {
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
// if the target is not ready yet, retry
retryRecovery(recoveryStatus.recoveryId(), recoverySettings.retryDelay());
retryRecovery(recoveryStatus, "remote shard not ready", recoverySettings.retryDelayStateSync(), request);
return;
}
if (cause instanceof DelayRecoveryException) {
retryRecovery(recoveryStatus.recoveryId(), recoverySettings.retryDelay());
retryRecovery(recoveryStatus, cause.getMessage(), recoverySettings.retryDelayStateSync(), request);
return;
}
if (cause instanceof ConnectTransportException) {
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source node disconnected", cause), false);
logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", recoveryStatus.shardId(), recoverySettings.retryDelayNetwork(), cause.getMessage());
retryRecovery(recoveryStatus, cause.getMessage(), recoverySettings.retryDelayNetwork(), request);
return;
}

View File

@ -81,8 +81,6 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
private final StartRecoveryRequest request;
private final RecoverySettings recoverySettings;
private final TransportService transportService;
private final TimeValue internalActionTimeout;
private final TimeValue internalActionLongTimeout;
private final ClusterService clusterService;
private final IndexService indexService;
private final MappingUpdatedAction mappingUpdatedAction;
@ -106,16 +104,13 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
public ShardRecoveryHandler(final IndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings,
final TransportService transportService, final TimeValue internalActionTimeout,
final TimeValue internalActionLongTimeout, final ClusterService clusterService,
final TransportService transportService, final ClusterService clusterService,
final IndicesService indicesService, final MappingUpdatedAction mappingUpdatedAction, final ESLogger logger) {
this.shard = shard;
this.request = request;
this.recoverySettings = recoverySettings;
this.logger = logger;
this.transportService = transportService;
this.internalActionTimeout = internalActionTimeout;
this.internalActionLongTimeout = internalActionLongTimeout;
this.clusterService = clusterService;
this.indexName = this.request.shardId().index().name();
this.shardId = this.request.shardId().id();
@ -203,7 +198,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
TransportRequestOptions.options().withTimeout(internalActionTimeout),
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
@ -266,7 +261,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
final TransportRequestOptions requestOptions = TransportRequestOptions.options()
.withCompress(shouldCompressRequest)
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(internalActionTimeout);
.withTimeout(recoverySettings.internalActionTimeout());
while (readCount < len) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
@ -351,7 +346,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
// are deleted
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata),
TransportRequestOptions.options().withTimeout(internalActionTimeout),
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
@ -393,7 +388,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
// garbage collection (not the JVM's GC!) of tombstone deletes
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()),
TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
@ -450,7 +445,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
// during this time
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()),
TransportRequestOptions.options().withTimeout(internalActionLongTimeout),
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionLongTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
@ -546,9 +541,9 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
@Override
public void run() throws InterruptedException {
try {
if (!updatedOnMaster.await(internalActionTimeout.millis(), TimeUnit.MILLISECONDS)) {
if (!updatedOnMaster.await(recoverySettings.internalActionTimeout().millis(), TimeUnit.MILLISECONDS)) {
logger.debug("[{}][{}] recovery [phase2] to {}: waiting on pending mapping update timed out. waited [{}]",
indexName, shardId, request.targetNode(), internalActionTimeout);
indexName, shardId, request.targetNode(), recoverySettings.internalActionTimeout());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -576,7 +571,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
final TransportRequestOptions recoveryOptions = TransportRequestOptions.options()
.withCompress(recoverySettings.compress())
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(internalActionLongTimeout);
.withTimeout(recoverySettings.internalActionLongTimeout());
while (operation != null) {
if (shard.state() == IndexShardState.CLOSED) {

View File

@ -76,6 +76,7 @@ public class VersionTests extends ElasticsearchTestCase {
assertThat(version.luceneVersion, sameInstance(Version.fromId(version.id).luceneVersion));
}
}
@Test
public void testCURRENTIsLatest() {
final int iters = scaledRandomIntBetween(100, 1000);

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DiscoveryNodeTests extends ElasticsearchTestCase {
@Test
public void testJavaSerializablilty() throws IOException, ClassNotFoundException {
final int iters = scaledRandomIntBetween(100, 300);
for (int i = 0; i < iters; i++) {
final String id = randomUnicodeOfLengthBetween(3, 20);
final String nodeName = randomUnicodeOfLengthBetween(3, 20);
final String hostName = randomUnicodeOfLengthBetween(3, 20);
final String hostAddress = randomUnicodeOfLengthBetween(3, 20);
final TransportAddress transportAddress = new LocalTransportAddress(randomUnicodeOfLengthBetween(3, 20));
final Map<String, String> attributes = new HashMap<>();
for (int a = randomInt(10); a > 0; a--) {
attributes.put(randomUnicodeOfLengthBetween(3, 20), randomUnicodeOfLengthBetween(3, 20));
}
final Version version = randomVersion();
DiscoveryNode discoveryNode = new DiscoveryNode(nodeName, id, hostName, hostAddress, transportAddress, attributes, version);
BytesStreamOutput bytesOutput = new BytesStreamOutput();
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(bytesOutput);
too.writeObject(discoveryNode);
too.close();
ThrowableObjectInputStream from = new ThrowableObjectInputStream(new BytesStreamInput(bytesOutput.bytes()));
DiscoveryNode readDiscoveryNode = (DiscoveryNode) from.readObject();
from.close();
assertThat(readDiscoveryNode, Matchers.equalTo(discoveryNode));
assertThat(readDiscoveryNode.id(), Matchers.equalTo(id));
assertThat(readDiscoveryNode.name(), Matchers.equalTo(nodeName));
assertThat(readDiscoveryNode.getHostName(), Matchers.equalTo(hostName));
assertThat(readDiscoveryNode.getHostAddress(), Matchers.equalTo(hostAddress));
assertThat(readDiscoveryNode.address(), Matchers.equalTo(transportAddress));
assertThat(readDiscoveryNode.attributes(), Matchers.equalTo(attributes));
assertThat(readDiscoveryNode.version(), Matchers.equalTo(version));
}
}
}

View File

@ -22,8 +22,8 @@ package org.elasticsearch.index.engine.internal;
import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Field;

View File

@ -20,15 +20,25 @@
package org.elasticsearch.indices.recovery;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.google.common.util.concurrent.ListenableFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
import org.elasticsearch.indices.recovery.RecoveryState.Type;
@ -36,16 +46,22 @@ import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.*;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
/**
@ -412,4 +428,116 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
assertThat(indexState.percentBytesRecovered(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.percentBytesRecovered(), lessThanOrEqualTo(100.0f));
}
@Test
public void disconnectsWhileRecoveringTest() throws Exception {
final String indexName = "test";
final Settings nodeSettings = ImmutableSettings.builder()
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK, "100ms")
.put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, "1s")
.put("cluster.routing.schedule", "100ms") // aggressive reroute post shard failures
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false) // restarted recoveries will delete temp files and write them again
.build();
// start a master node
internalCluster().startNode(nodeSettings);
ListenableFuture<String> blueFuture = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "blue").put(nodeSettings).build());
ListenableFuture<String> redFuture = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "red").put(nodeSettings).build());
final String blueNodeName = blueFuture.get();
final String redNodeName = redFuture.get();
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
assertThat(response.isTimedOut(), is(false));
client().admin().indices().prepareCreate(indexName)
.setSettings(
ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numDocs; i++) {
requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
}
indexRandom(true, requests);
ensureSearchable(indexName);
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
final String blueNodeId = internalCluster().getInstance(DiscoveryService.class, blueNodeName).localNode().id();
assertFalse(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());
SearchResponse searchResponse = client().prepareSearch(indexName).get();
assertHitCount(searchResponse, numDocs);
String[] recoveryActions = new String[]{
RecoverySource.Actions.START_RECOVERY,
RecoveryTarget.Actions.FILES_INFO,
RecoveryTarget.Actions.FILE_CHUNK,
RecoveryTarget.Actions.CLEAN_FILES,
//RecoveryTarget.Actions.TRANSLOG_OPS, <-- may not be sent if already flushed
RecoveryTarget.Actions.PREPARE_TRANSLOG,
RecoveryTarget.Actions.FINALIZE
};
final String recoveryActionToBlock = randomFrom(recoveryActions);
final boolean dropRequests = randomBoolean();
logger.info("--> will {} between blue & red on [{}]", dropRequests ? "drop requests" : "break connection", recoveryActionToBlock);
MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName);
MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
DiscoveryNode redDiscoNode = internalCluster().getInstance(ClusterService.class, redNodeName).localNode();
DiscoveryNode blueDiscoNode = internalCluster().getInstance(ClusterService.class, blueNodeName).localNode();
final CountDownLatch requestBlocked = new CountDownLatch(1);
blueMockTransportService.addDelegate(redDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, blueMockTransportService.original(), requestBlocked));
redMockTransportService.addDelegate(blueDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, redMockTransportService.original(), requestBlocked));
logger.info("--> starting recovery from blue to red");
client().admin().indices().prepareUpdateSettings(indexName).setSettings(
ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red,blue")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
).get();
requestBlocked.await();
logger.info("--> stopping to block recovery");
blueMockTransportService.clearAllRules();
redMockTransportService.clearAllRules();
ensureGreen();
searchResponse = client(redNodeName).prepareSearch(indexName).setPreference("_local").get();
assertHitCount(searchResponse, numDocs);
}
private class RecoveryActionBlocker extends MockTransportService.DelegateTransport {
private final boolean dropRequests;
private final String recoveryActionToBlock;
private final CountDownLatch requestBlocked;
public RecoveryActionBlocker(boolean dropRequests, String recoveryActionToBlock, Transport delegate, CountDownLatch requestBlocked) {
super(delegate);
this.dropRequests = dropRequests;
this.recoveryActionToBlock = recoveryActionToBlock;
this.requestBlocked = requestBlocked;
}
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (recoveryActionToBlock.equals(action) || requestBlocked.getCount() == 0) {
logger.info("--> preventing {} request", action);
requestBlocked.countDown();
if (dropRequests) {
return;
}
throw new ConnectTransportException(node, "DISCONNECT: prevented " + action + " request");
}
transport.sendRequest(node, requestId, action, request, options);
}
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.recovery;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.Test;
public class RecoverySettingsTest extends ElasticsearchSingleNodeTest {
@Override
protected boolean resetNodeAfterTest() {
return true;
}
@Test
public void testAllSettingsAreDynamicallyUpdatable() {
innerTestSettings(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.fileChunkSize().bytesAsInt());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.translogOps());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.translogSize().bytesAsInt());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.concurrentStreamPool().getMaximumPoolSize());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.concurrentSmallFileStreamPool().getMaximumPoolSize());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, 0, new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(null, recoverySettings.rateLimiter());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.retryDelayStateSync().millis());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.retryDelayNetwork().millis());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.activityTimeout().millis());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.internalActionTimeout().millis());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, randomIntBetween(1, 200), new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.internalActionLongTimeout().millis());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_COMPRESS, false, new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, boolean expectedValue) {
assertEquals(expectedValue, recoverySettings.compress());
}
});
}
private static class Validator {
public void validate(RecoverySettings recoverySettings, int expectedValue) {
}
public void validate(RecoverySettings recoverySettings, boolean expectedValue) {
}
}
private void innerTestSettings(String key, int newValue, Validator validator) {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder().put(key, newValue)).get();
validator.validate(getInstanceFromNode(RecoverySettings.class), newValue);
}
private void innerTestSettings(String key, boolean newValue, Validator validator) {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder().put(key, newValue)).get();
validator.validate(getInstanceFromNode(RecoverySettings.class), newValue);
}
}

View File

@ -532,9 +532,6 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
// if (action.equals(RecoveryTarget.Actions.PREPARE_TRANSLOG)) {
// logger.debug("dropped [{}] to {}", action, node);
//} else
if (action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
@ -550,5 +547,5 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
}
}
}
}

View File

@ -47,10 +47,10 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.FileSystemUtils;
@ -293,14 +293,13 @@ public final class InternalTestCluster extends TestCluster {
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, RandomInts.randomIntBetween(random, 10, 15));
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, RandomInts.randomIntBetween(random, 10, 15));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, RandomInts.randomIntBetween(random, 5, 10));
builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 10, 25))); // more shared - we need to retry more often
} else if (random.nextInt(100) <= 90) {
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, RandomInts.randomIntBetween(random, 3, 6));
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, RandomInts.randomIntBetween(random, 3, 6));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, RandomInts.randomIntBetween(random, 2, 5));
}
// always reduce this - it can make tests really slow
builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50)));
builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50)));
defaultSettings = builder.build();
executor = EsExecutors.newCached(0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName));
}