Remove shadow replicas

Resolves #22024
This commit is contained in:
Lee Hinman 2017-04-03 14:59:44 -06:00
parent 76603d4413
commit 5cace8e48a
53 changed files with 86 additions and 3237 deletions

View File

@ -63,9 +63,4 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
}
@Override
protected boolean shouldExecuteReplication(IndexMetaData indexMetaData) {
return true;
}
}

View File

@ -66,9 +66,4 @@ public class TransportShardRefreshAction
logger.trace("{} refresh request executed on replica", replica.shardId());
return new ReplicaResult();
}
@Override
protected boolean shouldExecuteReplication(IndexMetaData indexMetaData) {
return true;
}
}

View File

@ -68,13 +68,6 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
@Override
protected void resolveRequest(ClusterState state, InternalRequest request) {
IndexMetaData indexMeta = state.getMetaData().index(request.concreteIndex());
if (request.request().realtime && // if the realtime flag is set
request.request().preference() == null && // the preference flag is not already set
indexMeta != null && // and we have the index
indexMeta.isIndexUsingShadowReplicas()) { // and the index uses shadow replicas
// set the preference for the request to use "_primary" automatically
request.request().preference(Preference.PRIMARY.type());
}
// update the routing (request#index here is possibly an alias)
request.request().routing(state.metaData().resolveIndexRouting(request.request().parent(), request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.

View File

@ -74,7 +74,6 @@ public class ReplicationOperation<
*/
private final AtomicInteger pendingActions = new AtomicInteger();
private final AtomicInteger successfulShards = new AtomicInteger();
private final boolean executeOnReplicas;
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
private final Replicas<ReplicaRequest> replicasProxy;
private final AtomicBoolean finished = new AtomicBoolean();
@ -86,9 +85,8 @@ public class ReplicationOperation<
public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
boolean executeOnReplicas, Replicas<ReplicaRequest> replicas,
Replicas<ReplicaRequest> replicas,
Supplier<ClusterState> clusterStateSupplier, Logger logger, String opType) {
this.executeOnReplicas = executeOnReplicas;
this.replicasProxy = replicas;
this.primary = primary;
this.resultListener = listener;
@ -160,7 +158,7 @@ public class ReplicationOperation<
final String localNodeId = primary.routingEntry().currentNodeId();
// If the index gets deleted after primary operation, we skip replication
for (final ShardRouting shard : shards) {
if (executeOnReplicas == false || shard.unassigned()) {
if (shard.unassigned()) {
if (shard.primary() == false) {
totalShards.incrementAndGet();
}

View File

@ -319,11 +319,10 @@ public abstract class TransportReplicationAction<
} else {
setPhase(replicationTask, "primary");
final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData);
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(request,
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
primaryShardReference, executeOnReplicas)
primaryShardReference)
.execute();
}
} catch (Exception e) {
@ -371,9 +370,9 @@ public abstract class TransportReplicationAction<
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
PrimaryShardReference primaryShardReference) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName);
replicasProxy, clusterService::state, logger, actionName);
}
}
@ -909,14 +908,6 @@ public abstract class TransportReplicationAction<
indexShard.acquirePrimaryOperationLock(onAcquired, executor);
}
/**
* Indicated whether this operation should be replicated to shadow replicas or not. If this method returns true the replication phase
* will be skipped. For example writes such as index and delete don't need to be replicated on shadow replicas but refresh and flush do.
*/
protected boolean shouldExecuteReplication(IndexMetaData indexMetaData) {
return indexMetaData.isIndexUsingShadowReplicas() == false;
}
class ShardReference implements Releasable {
protected final IndexShard indexShard;

View File

@ -383,13 +383,6 @@ public class InternalClusterInfoService extends AbstractComponent
if (logger.isTraceEnabled()) {
logger.trace("shard: {} size: {}", sid, size);
}
if (indexMeta != null && indexMeta.isIndexUsingShadowReplicas()) {
// Shards on a shared filesystem should be considered of size 0
if (logger.isTraceEnabled()) {
logger.trace("shard: {} is using shadow replicas and will be treated as size 0", sid);
}
size = 0;
}
newShardSizes.put(sid, size);
}
}

View File

@ -192,18 +192,11 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas";
public static final Setting<Integer> INDEX_NUMBER_OF_REPLICAS_SETTING =
Setting.intSetting(SETTING_NUMBER_OF_REPLICAS, 1, 0, Property.Dynamic, Property.IndexScope);
public static final String SETTING_SHADOW_REPLICAS = "index.shadow_replicas";
public static final Setting<Boolean> INDEX_SHADOW_REPLICAS_SETTING =
Setting.boolSetting(SETTING_SHADOW_REPLICAS, false, Property.IndexScope, Property.Deprecated);
public static final String SETTING_ROUTING_PARTITION_SIZE = "index.routing_partition_size";
public static final Setting<Integer> INDEX_ROUTING_PARTITION_SIZE_SETTING =
Setting.intSetting(SETTING_ROUTING_PARTITION_SIZE, 1, 1, Property.IndexScope);
public static final String SETTING_SHARED_FILESYSTEM = "index.shared_filesystem";
public static final Setting<Boolean> INDEX_SHARED_FILESYSTEM_SETTING =
Setting.boolSetting(SETTING_SHARED_FILESYSTEM, INDEX_SHADOW_REPLICAS_SETTING, Property.IndexScope, Property.Deprecated);
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;
public static final String SETTING_READ_ONLY = "index.blocks.read_only";
@ -240,10 +233,6 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
public static final String SETTING_DATA_PATH = "index.data_path";
public static final Setting<String> INDEX_DATA_PATH_SETTING =
new Setting<>(SETTING_DATA_PATH, "", Function.identity(), Property.IndexScope);
public static final String SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE = "index.shared_filesystem.recover_on_any_node";
public static final Setting<Boolean> INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING =
Setting.boolSetting(SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false,
Property.Dynamic, Property.IndexScope, Property.Deprecated);
public static final String INDEX_UUID_NA_VALUE = "_na_";
public static final String INDEX_ROUTING_REQUIRE_GROUP_PREFIX = "index.routing.allocation.require";
@ -1237,35 +1226,6 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
}
}
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(ESLoggerFactory.getLogger(IndexMetaData.class));
/**
* Returns <code>true</code> iff the given settings indicate that the index
* associated with these settings allocates it's shards on a shared
* filesystem. Otherwise <code>false</code>. The default setting for this
* is the returned value from
* {@link #isIndexUsingShadowReplicas(org.elasticsearch.common.settings.Settings)}.
*/
public boolean isOnSharedFilesystem(Settings settings) {
// don't use the setting directly, not to trigger verbose deprecation logging
return settings.getAsBooleanLenientForPreEs6Indices(
this.indexCreatedVersion, SETTING_SHARED_FILESYSTEM, isIndexUsingShadowReplicas(settings), deprecationLogger);
}
/**
* Returns <code>true</code> iff the given settings indicate that the index associated
* with these settings uses shadow replicas. Otherwise <code>false</code>. The default
* setting for this is <code>false</code>.
*/
public boolean isIndexUsingShadowReplicas() {
return isIndexUsingShadowReplicas(this.settings);
}
public boolean isIndexUsingShadowReplicas(Settings settings) {
// don't use the setting directly, not to trigger verbose deprecation logging
return settings.getAsBooleanLenientForPreEs6Indices(this.indexCreatedVersion, SETTING_SHADOW_REPLICAS, false, deprecationLogger);
}
/**
* Adds human readable version and creation date settings.
* This method is used to display the settings in a human readable format in REST API

View File

@ -433,10 +433,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
.put(indexMetaData, false)
.build();
String maybeShadowIndicator = indexMetaData.isIndexUsingShadowReplicas() ? "s" : "";
logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}{}], mappings {}",
logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}",
request.index(), request.cause(), templateNames, indexMetaData.getNumberOfShards(),
indexMetaData.getNumberOfReplicas(), maybeShadowIndicator, mappings.keySet());
indexMetaData.getNumberOfReplicas(), mappings.keySet());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
if (!request.blocks().isEmpty()) {

View File

@ -139,8 +139,7 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
"allocation set " + inSyncAllocationIds);
}
if (indexMetaData.isIndexUsingShadowReplicas() == false && // see #20650
shardRouting.primary() && shardRouting.initializing() && shardRouting.relocating() == false &&
if (shardRouting.primary() && shardRouting.initializing() && shardRouting.relocating() == false &&
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) == false &&
inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " +

View File

@ -567,9 +567,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
ShardRouting activeReplica = activeReplica(failedShard.shardId());
assert activeReplica == null || indexMetaData.isIndexUsingShadowReplicas() :
"initializing primary [" + failedShard + "] with active replicas [" + activeReplica + "] only expected when " +
"using shadow replicas";
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
@ -626,10 +623,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
assert activeReplica.started() : "replica relocation should have been cancelled: " + activeReplica;
ShardRouting primarySwappedCandidate = promoteActiveReplicaShardToPrimary(activeReplica);
routingChangesObserver.replicaPromoted(activeReplica);
if (indexMetaData.isIndexUsingShadowReplicas()) {
ShardRouting initializedShard = reinitShadowPrimary(primarySwappedCandidate);
routingChangesObserver.startedPrimaryReinitialized(primarySwappedCandidate, initializedShard);
}
}
/**

View File

@ -368,7 +368,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
TribeService.TRIBE_NAME_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
NodeEnvironment.ADD_NODE_LOCK_ID_TO_CUSTOM_PATH,
OsService.REFRESH_INTERVAL_SETTING,
ProcessService.REFRESH_INTERVAL_SETTING,
JvmService.REFRESH_INTERVAL_SETTING,

View File

@ -70,13 +70,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING,
IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING,
IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING,
IndexMetaData.INDEX_SHADOW_REPLICAS_SETTING,
IndexMetaData.INDEX_SHARED_FILESYSTEM_SETTING,
IndexMetaData.INDEX_READ_ONLY_SETTING,
IndexMetaData.INDEX_BLOCKS_READ_SETTING,
IndexMetaData.INDEX_BLOCKS_WRITE_SETTING,
IndexMetaData.INDEX_BLOCKS_METADATA_SETTING,
IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING,
IndexMetaData.INDEX_PRIORITY_SETTING,
IndexMetaData.INDEX_DATA_PATH_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,

View File

@ -157,13 +157,6 @@ public final class NodeEnvironment implements Closeable {
public static final Setting<Integer> MAX_LOCAL_STORAGE_NODES_SETTING = Setting.intSetting("node.max_local_storage_nodes", 1, 1,
Property.NodeScope);
/**
* If true automatically append node lock id to custom data paths.
*/
public static final Setting<Boolean> ADD_NODE_LOCK_ID_TO_CUSTOM_PATH =
Setting.boolSetting("node.add_lock_id_to_custom_path", true, Property.NodeScope);
/**
* Seed for determining a persisted unique uuid of this node. If the node has already a persisted uuid on disk,
* this seed will be ignored and the uuid from disk will be reused.
@ -922,11 +915,7 @@ public final class NodeEnvironment implements Closeable {
if (customDataDir != null) {
// This assert is because this should be caught by MetaDataCreateIndexService
assert sharedDataPath != null;
if (ADD_NODE_LOCK_ID_TO_CUSTOM_PATH.get(indexSettings.getNodeSettings())) {
return sharedDataPath.resolve(customDataDir).resolve(Integer.toString(this.nodeLockId));
} else {
return sharedDataPath.resolve(customDataDir);
}
} else {
throw new IllegalArgumentException("no custom " + IndexMetaData.SETTING_DATA_PATH + " setting available");
}

View File

@ -106,11 +106,10 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(unassignedShard.index());
final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(unassignedShard.id());
final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;
final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData);
assert inSyncAllocationIds.isEmpty() == false;
// use in-sync allocation ids to select nodes
final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore || recoverOnAnyNode,
final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore,
allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);
final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(),
@ -122,10 +121,6 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
logger.debug("[{}][{}]: missing local data, will restore from [{}]",
unassignedShard.index(), unassignedShard.id(), unassignedShard.recoverySource());
return AllocateUnassignedDecision.NOT_TAKEN;
} else if (recoverOnAnyNode) {
// let BalancedShardsAllocator take care of allocating this shard
logger.debug("[{}][{}]: missing local data, recover from any node", unassignedShard.index(), unassignedShard.id());
return AllocateUnassignedDecision.NOT_TAKEN;
} else {
// We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary.
// We could just be waiting for the node that holds the primary to start back up, in which case the allocation for
@ -331,19 +326,6 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
Collections.unmodifiableList(noNodeShards));
}
/**
* Return {@code true} if the index is configured to allow shards to be
* recovered on any node
*/
private boolean recoverOnAnyNode(IndexMetaData metaData) {
// don't use the setting directly, not to trigger verbose deprecation logging
return (metaData.isOnSharedFilesystem(metaData.getSettings()) || metaData.isOnSharedFilesystem(this.settings))
&& (metaData.getSettings().getAsBooleanLenientForPreEs6Indices(
metaData.getCreationVersion(), IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false, deprecationLogger) ||
this.settings.getAsBooleanLenientForPreEs6Indices
(metaData.getCreationVersion(), IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false, deprecationLogger));
}
protected abstract FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
private static class NodeShardsResult {

View File

@ -55,7 +55,6 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardPath;
@ -343,8 +342,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
logger.debug("creating shard_id {}", shardId);
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
final boolean canDeleteShardContent = this.indexSettings.isOnSharedFilesystem() == false ||
(primary && this.indexSettings.isOnSharedFilesystem());
final Engine.Warmer engineWarmer = (searcher) -> {
IndexShard shard = getShardOrNull(shardId.getId());
if (shard != null) {
@ -352,18 +349,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
};
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock,
new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId)));
if (useShadowEngine(primary, this.indexSettings)) {
indexShard = new ShadowIndexShard(routing, this.indexSettings, path, store, indexCache, mapperService, similarityService,
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
searchOperationListeners);
// no indexing listeners - shadow engines don't index
} else {
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexCache, mapperService, similarityService,
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
() -> globalCheckpointSyncer.accept(shardId),
searchOperationListeners, indexingOperationListeners);
}
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
@ -381,10 +371,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
}
static boolean useShadowEngine(boolean primary, IndexSettings indexSettings) {
return primary == false && indexSettings.isShadowReplicaIndex();
}
@Override
public synchronized void removeShard(int shardId, String reason) {
final ShardId sId = new ShardId(index(), shardId);
@ -438,17 +424,15 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
private void onShardClose(ShardLock lock, boolean ownsShard) {
private void onShardClose(ShardLock lock) {
if (deleted.get()) { // we remove that shards content if this index has been deleted
try {
if (ownsShard) {
try {
eventListener.beforeIndexShardDeleted(lock.getShardId(), indexSettings.getSettings());
} finally {
shardStoreDeleter.deleteShardStore("delete index", lock, indexSettings);
eventListener.afterIndexShardDeleted(lock.getShardId(), indexSettings.getSettings());
}
}
} catch (IOException e) {
shardStoreDeleter.addPendingDelete(lock.getShardId(), indexSettings);
logger.debug(
@ -514,12 +498,10 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId;
private final boolean ownsShard;
private final Closeable[] toClose;
StoreCloseListener(ShardId shardId, boolean ownsShard, Closeable... toClose) {
StoreCloseListener(ShardId shardId, Closeable... toClose) {
this.shardId = shardId;
this.ownsShard = ownsShard;
this.toClose = toClose;
}
@ -527,7 +509,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
public void handle(ShardLock lock) {
try {
assert lock.getShardId().equals(shardId) : "shard id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
onShardClose(lock, ownsShard);
onShardClose(lock);
} finally {
try {
IOUtils.close(toClose);

View File

@ -160,7 +160,6 @@ public final class IndexSettings {
private final String nodeName;
private final Settings nodeSettings;
private final int numberOfShards;
private final boolean isShadowReplicaIndex;
// volatile fields are updated via #updateIndexMetaData(IndexMetaData) under lock
private volatile Settings settings;
private volatile IndexMetaData indexMetaData;
@ -257,7 +256,6 @@ public final class IndexSettings {
nodeName = Node.NODE_NAME_SETTING.get(settings);
this.indexMetaData = indexMetaData;
numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null);
isShadowReplicaIndex = indexMetaData.isIndexUsingShadowReplicas(settings);
this.defaultField = DEFAULT_FIELD_SETTING.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
@ -359,15 +357,6 @@ public final class IndexSettings {
return settings.get(IndexMetaData.SETTING_DATA_PATH);
}
/**
* Returns <code>true</code> iff the given settings indicate that the index
* associated with these settings allocates it's shards on a shared
* filesystem.
*/
public boolean isOnSharedFilesystem() {
return indexMetaData.isOnSharedFilesystem(getSettings());
}
/**
* Returns the version the index was created on.
* @see Version#indexCreated(Settings)
@ -400,12 +389,6 @@ public final class IndexSettings {
*/
public int getNumberOfReplicas() { return settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, null); }
/**
* Returns <code>true</code> iff this index uses shadow replicas.
* @see IndexMetaData#isIndexUsingShadowReplicas(Settings)
*/
public boolean isShadowReplicaIndex() { return isShadowReplicaIndex; }
/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.

View File

@ -25,5 +25,4 @@ public interface EngineFactory {
Engine newReadWriteEngine(EngineConfig config);
Engine newReadOnlyEngine(EngineConfig config);
}

View File

@ -24,9 +24,4 @@ public class InternalEngineFactory implements EngineFactory {
public Engine newReadWriteEngine(EngineConfig config) {
return new InternalEngine(config);
}
@Override
public Engine newReadOnlyEngine(EngineConfig config) {
return new ShadowEngine(config);
}
}

View File

@ -1,282 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
/**
* ShadowEngine is a specialized engine that only allows read-only operations
* on the underlying Lucene index. An {@code IndexReader} is opened instead of
* an {@code IndexWriter}. All methods that would usually perform write
* operations are no-ops, this means:
*
* - No operations are written to or read from the translog
* - Create, Index, and Delete do nothing
* - Flush does not fsync any files, or make any on-disk changes
*
* In order for new segments to become visible, the ShadowEngine may perform
* stage1 of the traditional recovery process (copying segment files) from a
* regular primary (which uses {@link org.elasticsearch.index.engine.InternalEngine})
*
* Notice that since this Engine does not deal with the translog, any
* {@link #get(Get get)} request goes directly to the searcher, meaning it is
* non-realtime.
*/
public class ShadowEngine extends Engine {
/** how long to wait for an index to exist */
public static final String NONEXISTENT_INDEX_RETRY_WAIT = "index.shadow.wait_for_initial_commit";
public static final TimeValue DEFAULT_NONEXISTENT_INDEX_RETRY_WAIT = TimeValue.timeValueSeconds(5);
private volatile SearcherManager searcherManager;
private volatile SegmentInfos lastCommittedSegmentInfos;
public ShadowEngine(EngineConfig engineConfig) {
super(engineConfig);
if (engineConfig.getRefreshListeners() != null) {
throw new IllegalArgumentException("ShadowEngine doesn't support RefreshListeners");
}
SearcherFactory searcherFactory = new EngineSearcherFactory(engineConfig);
final long nonexistentRetryTime = engineConfig.getIndexSettings().getSettings()
.getAsTime(NONEXISTENT_INDEX_RETRY_WAIT, DEFAULT_NONEXISTENT_INDEX_RETRY_WAIT)
.getMillis();
try {
DirectoryReader reader = null;
store.incRef();
boolean success = false;
try {
if (Lucene.waitForIndex(store.directory(), nonexistentRetryTime)) {
reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(store.directory()), shardId);
this.searcherManager = new SearcherManager(reader, searcherFactory);
this.lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
success = true;
} else {
throw new IllegalStateException("failed to open a shadow engine after" +
nonexistentRetryTime + "ms, " +
"directory is not an index");
}
} catch (Exception e) {
logger.warn("failed to create new reader", e);
throw e;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(reader);
store.decRef();
}
}
} catch (IOException ex) {
throw new EngineCreationFailureException(shardId, "failed to open index reader", ex);
}
logger.trace("created new ShadowEngine");
}
@Override
public IndexResult index(Index index) {
throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");
}
@Override
public DeleteResult delete(Delete delete) {
throw new UnsupportedOperationException(shardId + " delete operation not allowed on shadow engine");
}
@Override
public NoOpResult noOp(NoOp noOp) {
throw new UnsupportedOperationException(shardId + " no-op operation not allowed on shadow engine");
}
@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) {
throw new UnsupportedOperationException(shardId + " sync commit operation not allowed on shadow engine");
}
@Override
public CommitId flush() throws EngineException {
return flush(false, false);
}
@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
logger.trace("skipping FLUSH on shadow engine");
// reread the last committed segment infos
refresh("flush");
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
store.incRef();
try (ReleasableLock lock = readLock.acquire()) {
// reread the last committed segment infos
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
} catch (Exception e) {
if (isClosed.get() == false) {
logger.warn("failed to read latest segment infos on flush", e);
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
store.decRef();
}
return new CommitId(lastCommittedSegmentInfos.getId());
}
@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException {
// no-op
logger.trace("skipping FORCE-MERGE on shadow engine");
}
@Override
public GetResult get(Get get, Function<String, Searcher> searcherFacotry) throws EngineException {
// There is no translog, so we can get it directly from the searcher
return getFromSearcher(get, searcherFacotry);
}
@Override
public Translog getTranslog() {
throw new UnsupportedOperationException("shadow engines don't have translogs");
}
@Override
public List<Segment> segments(boolean verbose) {
try (ReleasableLock lock = readLock.acquire()) {
Segment[] segmentsArr = getSegmentInfo(lastCommittedSegmentInfos, verbose);
for (int i = 0; i < segmentsArr.length; i++) {
// hard code all segments as committed, because they are in
// order for the shadow replica to see them
segmentsArr[i].committed = true;
}
return Arrays.asList(segmentsArr);
}
}
@Override
public void refresh(String source) throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException e) {
throw e;
} catch (Exception e) {
try {
failEngine("refresh failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
}
@Override
public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException {
throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
}
@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
}
@Override
protected void closeNoLock(String reason) {
if (isClosed.compareAndSet(false, true)) {
try {
logger.debug("shadow replica close searcher manager refCount: {}", store.refCount());
IOUtils.close(searcherManager);
} catch (Exception e) {
logger.warn("shadow replica failed to close searcher manager", e);
} finally {
store.decRef();
}
}
}
@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}
@Override
public long getIndexBufferRAMBytesUsed() {
// No IndexWriter nor version map
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
@Override
public void writeIndexingBuffer() {
// No indexing buffer
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
@Override
public void activateThrottling() {
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
@Override
public void deactivateThrottling() {
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
@Override
public SequenceNumbersService seqNoService() {
throw new UnsupportedOperationException("ShadowEngine doesn't track sequence numbers");
}
@Override
public boolean isThrottled() {
return false;
}
@Override
public long getIndexThrottleTimeInMillis() {
return 0L;
}
@Override
public Engine recoverFromTranslog() throws IOException {
throw new UnsupportedOperationException("can't recover on a shadow engine");
}
}

View File

@ -221,10 +221,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/
private final AtomicBoolean active = new AtomicBoolean();
/**
* Allows for the registration of listeners that are called when a change becomes visible for search. This is nullable because
* {@linkplain ShadowIndexShard} doesn't support this.
* Allows for the registration of listeners that are called when a change becomes visible for search.
*/
@Nullable
private final RefreshListeners refreshListeners;
public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
@ -1925,9 +1923,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* Build {@linkplain RefreshListeners} for this shard. Protected so {@linkplain ShadowIndexShard} can override it to return null.
* Build {@linkplain RefreshListeners} for this shard.
*/
protected RefreshListeners buildRefreshListeners() {
private RefreshListeners buildRefreshListeners() {
return new RefreshListeners(
indexSettings::getMaxRefreshListeners,
() -> refresh("too_many_listeners"),

View File

@ -1,144 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
/**
* ShadowIndexShard extends {@link IndexShard} to add file synchronization
* from the primary when a flush happens. It also ensures that a replica being
* promoted to a primary causes the shard to fail, kicking off a re-allocation
* of the primary shard.
*/
public final class ShadowIndexShard extends IndexShard {
public ShadowIndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper wrapper,
ThreadPool threadPool, BigArrays bigArrays, Engine.Warmer engineWarmer,
List<SearchOperationListener> searchOperationListeners) throws IOException {
super(shardRouting, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory,
indexEventListener, wrapper, threadPool, bigArrays, engineWarmer, () -> {
}, searchOperationListeners, Collections.emptyList());
}
/**
* In addition to the regular accounting done in
* {@link IndexShard#updateRoutingEntry(ShardRouting)},
* if this shadow replica needs to be promoted to a primary, the shard is
* failed in order to allow a new primary to be re-allocated.
*/
@Override
public void updateRoutingEntry(ShardRouting newRouting) throws IOException {
if (newRouting.primary()) {// becoming a primary
throw new IllegalStateException("can't promote shard to primary");
}
super.updateRoutingEntry(newRouting);
}
@Override
public MergeStats mergeStats() {
return new MergeStats();
}
@Override
public SeqNoStats seqNoStats() {
return null;
}
@Override
public boolean canIndex() {
return false;
}
@Override
protected Engine newEngine(EngineConfig config) {
assert this.shardRouting.primary() == false;
assert config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
return engineFactory.newReadOnlyEngine(config);
}
@Override
protected RefreshListeners buildRefreshListeners() {
// ShadowEngine doesn't have a translog so it shouldn't try to support RefreshListeners.
return null;
}
@Override
public boolean shouldFlush() {
// we don't need to flush since we don't write - all dominated by the primary
return false;
}
@Override
public TranslogStats translogStats() {
return null; // shadow engine has no translog
}
@Override
public void updateGlobalCheckpointOnReplica(long checkpoint) {
}
@Override
public long getLocalCheckpoint() {
return -1;
}
@Override
public long getGlobalCheckpoint() {
return -1;
}
@Override
public void addRefreshListener(Translog.Location location, Consumer<Boolean> listener) {
throw new UnsupportedOperationException("Can't listen for a refresh on a shadow engine because it doesn't have a translog");
}
@Override
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us");
}
@Override
protected void onNewEngine(Engine newEngine) {
// nothing to do here - the superclass sets the translog on some listeners but we don't have such a thing
}
}

View File

@ -28,7 +28,6 @@ import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.store.SleepingLockWrapper;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -74,9 +73,6 @@ public class FsDirectoryService extends DirectoryService {
Set<String> preLoadExtensions = new HashSet<>(
indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING));
wrapped = setPreload(wrapped, location, lockFactory, preLoadExtensions);
if (indexSettings.isOnSharedFilesystem()) {
wrapped = new SleepingLockWrapper(wrapped, 5000);
}
return wrapped;
}

View File

@ -424,11 +424,11 @@ public class IndicesService extends AbstractLifecycleComponent
IndexingOperationListener... indexingOperationListeners) throws IOException {
final Index index = indexMetaData.getIndex();
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting);
logger.debug("creating Index [{}], shards [{}]/[{}{}] - reason [{}]",
logger.debug("creating Index [{}], shards [{}]/[{}] - reason [{}]",
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
idxSettings.isShadowReplicaIndex() ? "s" : "", reason);
reason);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
@ -732,17 +732,12 @@ public class IndicesService extends AbstractLifecycleComponent
* @return true if the index can be deleted on this node
*/
public boolean canDeleteIndexContents(Index index, IndexSettings indexSettings) {
// index contents can be deleted if the index is not on a shared file system,
// or if its on a shared file system but its an already closed index (so all
// its resources have already been relinquished)
if (indexSettings.isOnSharedFilesystem() == false || indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE) {
// index contents can be deleted if its an already closed index (so all its resources have
// already been relinquished)
final IndexService indexService = indexService(index);
if (indexService == null && nodeEnv.hasNodeFile()) {
return true;
}
} else {
logger.trace("{} skipping index directory deletion due to shadow replicas", index);
}
return false;
}
@ -789,7 +784,6 @@ public class IndicesService extends AbstractLifecycleComponent
FOLDER_FOUND_CAN_DELETE, // shard data exists and can be deleted
STILL_ALLOCATED, // the shard is still allocated / active on this node
NO_FOLDER_FOUND, // the shards data locations do not exist
SHARED_FILE_SYSTEM, // the shard is located on shared and should not be deleted
NO_LOCAL_STORAGE // node does not have local storage (see DiscoveryNode.nodeRequiresLocalStorage)
}
@ -802,7 +796,6 @@ public class IndicesService extends AbstractLifecycleComponent
public ShardDeletionCheckResult canDeleteShardContent(ShardId shardId, IndexSettings indexSettings) {
assert shardId.getIndex().equals(indexSettings.getIndex());
final IndexService indexService = indexService(shardId.getIndex());
if (indexSettings.isOnSharedFilesystem() == false) {
if (nodeEnv.hasNodeFile()) {
final boolean isAllocated = indexService != null && indexService.hasShard(shardId.id());
if (isAllocated) {
@ -823,10 +816,6 @@ public class IndicesService extends AbstractLifecycleComponent
} else {
return ShardDeletionCheckResult.NO_LOCAL_STORAGE;
}
} else {
logger.trace("{} skipping shard directory deletion due to shadow replicas", shardId);
return ShardDeletionCheckResult.SHARED_FILE_SYSTEM;
}
}
private IndexSettings buildIndexSettings(IndexMetaData metaData) {

View File

@ -197,13 +197,8 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), targetAllocationId, transportService,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
Supplier<Long> currentClusterStateVersionSupplier = () -> clusterService.state().getVersion();
if (shard.indexSettings().isOnSharedFilesystem()) {
handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
this::delayNewRecoveries, settings);
} else {
handler = new RecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
this::delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), settings);
}
return handler;
}

View File

@ -286,13 +286,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
*/
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
try {
if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
// we are not going to copy any files, so don't bother listing files, potentially running into concurrency issues with the
// primary changing files underneath us
return Store.MetadataSnapshot.EMPTY;
} else {
return recoveryTarget.indexShard().snapshotStoreMetadata();
}
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log
logger.trace("{} shard folder empty, recovering all files", recoveryTarget);

View File

@ -1,90 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A recovery handler that skips phase one as well as sending the translog snapshot.
*/
public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
private final IndexShard shard;
private final StartRecoveryRequest request;
SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request,
Supplier<Long> currentClusterStateVersionSupplier,
Function<String, Releasable> delayNewRecoveries, Settings nodeSettings) {
super(shard, recoveryTarget, request, currentClusterStateVersionSupplier, delayNewRecoveries, -1, nodeSettings);
this.shard = shard;
this.request = request;
}
@Override
public RecoveryResponse recoverToTarget() throws IOException {
boolean engineClosed = false;
try {
logger.trace("recovery [phase1]: skipping phase1 for shared filesystem");
final long maxUnsafeAutoIdTimestamp = shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp();
if (request.isPrimaryRelocation()) {
logger.debug("[phase1] closing engine on primary for shared filesystem recovery");
try {
// if we relocate we need to close the engine in order to open a new
// IndexWriter on the other end of the relocation
engineClosed = true;
shard.flushAndCloseEngine();
} catch (IOException e) {
logger.warn("close engine failed", e);
shard.failShard("failed to close engine (phase1)", e);
}
}
prepareTargetForTranslog(0, maxUnsafeAutoIdTimestamp);
finalizeRecovery();
return response;
} catch (Exception e) {
if (engineClosed) {
// If the relocation fails then the primary is closed and can't be
// used anymore... (because it's closed) that's a problem, so in
// that case, fail the shard to reallocate a new IndexShard and
// create a new IndexWriter
logger.info("recovery failed for primary shadow shard, failing shard");
// pass the failure as null, as we want to ensure the store is not marked as corrupted
shard.failShard("primary relocation failed on shared filesystem", e);
} else {
logger.info("recovery failed on shared filesystem", e);
}
throw e;
}
}
@Override
protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) {
logger.trace("skipping recovery of translog snapshot on shared filesystem");
return 0;
}
}

View File

@ -173,9 +173,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
case STILL_ALLOCATED:
// nothing to do
break;
case SHARED_FILE_SYSTEM:
// nothing to do
break;
default:
assert false : "unknown shard deletion check result: " + shardDeletionCheckResult;
}

View File

@ -190,19 +190,11 @@ public class RestShardsAction extends AbstractCatAction {
table.addCell(shard.id());
IndexMetaData indexMeta = state.getState().getMetaData().getIndexSafe(shard.index());
boolean usesShadowReplicas = false;
if (indexMeta != null) {
usesShadowReplicas = indexMeta.isIndexUsingShadowReplicas();
}
if (shard.primary()) {
table.addCell("p");
} else {
if (usesShadowReplicas) {
table.addCell("s");
} else {
table.addCell("r");
}
}
table.addCell(shard.state());
table.addCell(commonStats == null ? null : commonStats.getDocs().getCount());
table.addCell(commonStats == null ? null : commonStats.getStore().getSize());

View File

@ -132,33 +132,6 @@ public class ReplicationOperationTests extends ESTestCase {
assertThat(primary.knownLocalCheckpoints, equalTo(replicasProxy.generatedLocalCheckpoints));
}
public void testReplicationWithShadowIndex() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
final long primaryTerm = state.getMetaData().index(index).primaryTerm(0);
final IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId);
final ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, primaryTerm), listener, false,
new TestReplicaProxy(), () -> state, logger, "test");
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat(request.processedOnReplicas, equalTo(Collections.emptySet()));
assertTrue("listener is not marked as done", listener.isDone());
ShardInfo shardInfo = listener.actionGet().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(0));
assertThat(shardInfo.getFailures(), arrayWithSize(0));
assertThat(shardInfo.getSuccessful(), equalTo(1));
assertThat(shardInfo.getTotal(), equalTo(indexShardRoutingTable.getSize()));
}
public void testDemotedPrimary() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
@ -310,7 +283,7 @@ public class ReplicationOperationTests extends ESTestCase {
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, primaryTerm),
listener, randomBoolean(), new TestReplicaProxy(), () -> state, logger, "test");
listener, new TestReplicaProxy(), () -> state, logger, "test");
if (passesActiveShardCheck) {
assertThat(op.checkActiveShardCount(), nullValue());
@ -519,13 +492,14 @@ public class ReplicationOperationTests extends ESTestCase {
class TestReplicationOperation extends ReplicationOperation<Request, Request, TestPrimary.Result> {
TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, Supplier<ClusterState> clusterStateSupplier) {
this(request, primary, listener, true, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test");
this(request, primary, listener, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test");
}
TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener, boolean executeOnReplicas,
Replicas<Request> replicas, Supplier<ClusterState> clusterStateSupplier, Logger logger, String opType) {
super(request, primary, listener, executeOnReplicas, replicas, clusterStateSupplier, logger, opType);
ActionListener<TestPrimary.Result> listener,
Replicas<Request> replicas, Supplier<ClusterState> clusterStateSupplier,
Logger logger, String opType) {
super(request, primary, listener, replicas, clusterStateSupplier, logger, opType);
}
}

View File

@ -497,8 +497,7 @@ public class TransportReplicationActionTests extends ESTestCase {
createReplicatedOperation(
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
assertPhase(task, "primary");
@ -550,8 +549,7 @@ public class TransportReplicationActionTests extends ESTestCase {
createReplicatedOperation(
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
assertPhase(task, "primary");
@ -650,35 +648,6 @@ public class TransportReplicationActionTests extends ESTestCase {
assertEquals(0, shardFailedRequests.length);
}
public void testShadowIndexDisablesReplication() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
MetaData.Builder metaData = MetaData.builder(state.metaData());
Settings.Builder settings = Settings.builder().put(metaData.get(index).getSettings());
settings.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true);
metaData.put(IndexMetaData.builder(metaData.get(index)).settings(settings));
state = ClusterState.builder(state).metaData(metaData).build();
setState(clusterService, state);
AtomicBoolean executed = new AtomicBoolean();
ShardRouting primaryShard = state.routingTable().shardRoutingTable(shardId).primaryShard();
action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(),
createTransportChannel(new PlainActionFuture<>()), null) {
@Override
protected ReplicationOperation<Request, Request, TestAction.PrimaryResult<Request, TestResponse>> createReplicatedOperation(
Request request, ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
assertFalse(executeOnReplicas);
assertFalse(executed.getAndSet(true));
return new NoopReplicationOperation(request, actionListener);
}
}.run();
assertThat(executed.get(), equalTo(true));
}
public void testSeqNoIsSetOnPrimary() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
@ -738,8 +707,7 @@ public class TransportReplicationActionTests extends ESTestCase {
createReplicatedOperation(
Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, TestResponse>> actionListener,
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
TransportReplicationAction<Request, Request, TestResponse>.PrimaryShardReference primaryShardReference) {
assertIndexShardCounter(1);
if (throwExceptionOnCreation) {
throw new ElasticsearchException("simulated exception, during createReplicatedOperation");
@ -1150,7 +1118,7 @@ public class TransportReplicationActionTests extends ESTestCase {
class NoopReplicationOperation extends ReplicationOperation<Request, Request, TestAction.PrimaryResult<Request, TestResponse>> {
NoopReplicationOperation(Request request, ActionListener<TestAction.PrimaryResult<Request, TestResponse>> listener) {
super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop");
super(request, null, listener, null, null, TransportReplicationActionTests.this.logger, "noop");
}
@Override

View File

@ -134,53 +134,6 @@ public class DiskUsageTests extends ESTestCase {
assertEquals(test1Path.getParent().getParent().getParent().toAbsolutePath().toString(), routingToPath.get(test_1));
}
public void testFillShardsWithShadowIndices() {
final Index index = new Index("non-shadow", "0xcafe0000");
ShardRouting s0 = ShardRouting.newUnassigned(new ShardId(index, 0), false, PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
s0 = ShardRoutingHelper.initialize(s0, "node1");
s0 = ShardRoutingHelper.moveToStarted(s0);
Path i0Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("0");
CommonStats commonStats0 = new CommonStats();
commonStats0.store = new StoreStats(100);
final Index index2 = new Index("shadow", "0xcafe0001");
ShardRouting s1 = ShardRouting.newUnassigned(new ShardId(index2, 0), false, PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
s1 = ShardRoutingHelper.initialize(s1, "node2");
s1 = ShardRoutingHelper.moveToStarted(s1);
Path i1Path = createTempDir().resolve("indices").resolve(index2.getUUID()).resolve("0");
CommonStats commonStats1 = new CommonStats();
commonStats1.store = new StoreStats(1000);
ShardStats[] stats = new ShardStats[] {
new ShardStats(s0, new ShardPath(false, i0Path, i0Path, s0.shardId()), commonStats0 , null, null),
new ShardStats(s1, new ShardPath(false, i1Path, i1Path, s1.shardId()), commonStats1 , null, null)
};
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardRouting, String> routingToPath = ImmutableOpenMap.builder();
ClusterState state = ClusterState.builder(new ClusterName("blarg"))
.version(0)
.metaData(MetaData.builder()
.put(IndexMetaData.builder("non-shadow")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_INDEX_UUID, "0xcafe0000")
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0))
.put(IndexMetaData.builder("shadow")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_INDEX_UUID, "0xcafe0001")
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)))
.build();
logger.info("--> calling buildShardLevelInfo with state: {}", state);
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath, state);
assertEquals(2, shardSizes.size());
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(s0)));
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(s1)));
assertEquals(100L, shardSizes.get(ClusterInfo.shardIdentifierFromRouting(s0)).longValue());
assertEquals(0L, shardSizes.get(ClusterInfo.shardIdentifierFromRouting(s1)).longValue());
}
public void testFillDiskUsage() {
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages = ImmutableOpenMap.builder();

View File

@ -68,7 +68,6 @@ public class IndexFolderUpgraderTests extends ESTestCase {
public void testUpgradeCustomDataPath() throws IOException {
Path customPath = createTempDir();
final Settings nodeSettings = Settings.builder()
.put(NodeEnvironment.ADD_NODE_LOCK_ID_TO_CUSTOM_PATH.getKey(), randomBoolean())
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), customPath.toAbsolutePath().toString()).build();
try (NodeEnvironment nodeEnv = newNodeEnvironment(nodeSettings)) {
final Index index = new Index(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
@ -97,7 +96,6 @@ public class IndexFolderUpgraderTests extends ESTestCase {
public void testPartialUpgradeCustomDataPath() throws IOException {
Path customPath = createTempDir();
final Settings nodeSettings = Settings.builder()
.put(NodeEnvironment.ADD_NODE_LOCK_ID_TO_CUSTOM_PATH.getKey(), randomBoolean())
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), customPath.toAbsolutePath().toString()).build();
try (NodeEnvironment nodeEnv = newNodeEnvironment(nodeSettings)) {
final Index index = new Index(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
@ -136,8 +134,7 @@ public class IndexFolderUpgraderTests extends ESTestCase {
}
public void testUpgrade() throws IOException {
final Settings nodeSettings = Settings.builder()
.put(NodeEnvironment.ADD_NODE_LOCK_ID_TO_CUSTOM_PATH.getKey(), randomBoolean()).build();
final Settings nodeSettings = Settings.EMPTY;
try (NodeEnvironment nodeEnv = newNodeEnvironment(nodeSettings)) {
final Index index = new Index(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
Settings settings = Settings.builder()
@ -159,8 +156,7 @@ public class IndexFolderUpgraderTests extends ESTestCase {
}
public void testUpgradeIndices() throws IOException {
final Settings nodeSettings = Settings.builder()
.put(NodeEnvironment.ADD_NODE_LOCK_ID_TO_CUSTOM_PATH.getKey(), randomBoolean()).build();
final Settings nodeSettings = Settings.EMPTY;
try (NodeEnvironment nodeEnv = newNodeEnvironment(nodeSettings)) {
Map<IndexSettings, Tuple<Integer, Integer>> indexSettingsMap = new HashMap<>();
for (int i = 0; i < randomIntBetween(2, 5); i++) {

View File

@ -380,11 +380,10 @@ public class NodeEnvironmentTests extends ESTestCase {
assertThat("index paths uses the regular template",
env.indexPaths(index), equalTo(stringsToPaths(dataPaths, "nodes/0/indices/" + index.getUUID())));
IndexSettings s3 = new IndexSettings(s2.getIndexMetaData(),
Settings.builder().put(NodeEnvironment.ADD_NODE_LOCK_ID_TO_CUSTOM_PATH.getKey(), false).build());
IndexSettings s3 = new IndexSettings(s2.getIndexMetaData(), Settings.builder().build());
assertThat(env.availableShardPaths(sid), equalTo(env.availableShardPaths(sid)));
assertThat(env.resolveCustomLocation(s3, sid), equalTo(PathUtils.get("/tmp/foo/" + index.getUUID() + "/0")));
assertThat(env.resolveCustomLocation(s3, sid), equalTo(PathUtils.get("/tmp/foo/0/" + index.getUUID() + "/0")));
assertThat("shard paths with a custom data_path should contain only regular paths",
env.availableShardPaths(sid),

View File

@ -330,26 +330,10 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
final int numNodes = 2;
final List<String> nodes;
if (randomBoolean()) {
// test with a regular index
logger.info("--> starting a cluster with " + numNodes + " nodes");
nodes = internalCluster().startNodes(numNodes);
logger.info("--> create an index");
createIndex(indexName);
} else {
// test with a shadow replica index
final Path dataPath = createTempDir();
logger.info("--> created temp data path for shadow replicas [{}]", dataPath);
logger.info("--> starting a cluster with " + numNodes + " nodes");
final Settings nodeSettings = Settings.builder()
.put("node.add_lock_id_to_custom_path", false)
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), dataPath.toString())
.put("index.store.fs.fs_lock", randomFrom("native", "simple"))
.build();
nodes = internalCluster().startNodes(numNodes, nodeSettings);
logger.info("--> create a shadow replica index");
createShadowReplicaIndex(indexName, dataPath, numNodes - 1);
}
logger.info("--> waiting for green status");
ensureGreen();
@ -535,23 +519,4 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
+ ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()));
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
}
/**
* Creates a shadow replica index and asserts that the index creation was acknowledged.
* Can only be invoked on a cluster where each node has been configured with shared data
* paths and the other necessary settings for shadow replicas.
*/
private void createShadowReplicaIndex(final String name, final Path dataPath, final int numReplicas) {
assert Files.exists(dataPath);
assert numReplicas >= 0;
final Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numReplicas)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.build();
assertAcked(prepareCreate(name).setSettings(idxSettings).get());
}
}

View File

@ -72,16 +72,6 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
private final DiscoveryNode node3 = newNode("node3");
private TestAllocator testAllocator;
/**
* needed due to random usage of {@link IndexMetaData#INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING}. removed once
* shadow replicas are removed.
*/
@Override
protected boolean enableWarningsCheck() {
return false;
}
@Before
public void buildTestAllocator() {
this.testAllocator = new TestAllocator();
@ -401,79 +391,6 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state, null, System.nanoTime(), false);
}
/**
* Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy and allocation
* deciders say yes, we allocate to that node.
*/
public void testRecoverOnAnyNode() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), "allocId");
testAllocator.addData(node1, "allocId", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
}
/**
* Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy and allocation
* deciders say throttle, we add it to ignored shards.
*/
public void testRecoverOnAnyNodeThrottle() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(throttleAllocationDeciders(), "allocId");
testAllocator.addData(node1, "allocId", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
/**
* Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy but allocation
* deciders say no, we still allocate to that node.
*/
public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders(), "allocId");
testAllocator.addData(node1, "allocId", randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
}
/**
* Tests that when recovering using "recover_on_any_node" and we don't find a node with a shard copy we let
* BalancedShardAllocator assign the shard
*/
public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), "allocId");
testAllocator.addData(node1, null, randomBoolean());
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}
private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders, String... allocIds) {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true))
.numberOfShards(1).numberOfReplicas(0).putInSyncAllocationIds(0, Sets.newHashSet(allocIds)))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRestore(metaData.index(shardId.getIndex()), new SnapshotRecoverySource(new Snapshot("test", new SnapshotId("test", UUIDs.randomBase64UUID())), Version.CURRENT, shardId.getIndexName()))
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)).build();
return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state, null, System.nanoTime(), false);
}
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, UnassignedInfo.Reason reason,
String... activeAllocationIds) {
MetaData metaData = MetaData.builder()

View File

@ -47,36 +47,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
/** Unit test(s) for IndexService */
public class IndexServiceTests extends ESSingleNodeTestCase {
public void testDetermineShadowEngineShouldBeUsed() {
IndexSettings regularSettings = new IndexSettings(
IndexMetaData
.builder("regular")
.settings(Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 2)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build())
.build(),
Settings.EMPTY);
IndexSettings shadowSettings = new IndexSettings(
IndexMetaData
.builder("shadow")
.settings(Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 2)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build())
.build(),
Settings.EMPTY);
assertFalse("no shadow replicas for normal settings", IndexService.useShadowEngine(true, regularSettings));
assertFalse("no shadow replicas for normal settings", IndexService.useShadowEngine(false, regularSettings));
assertFalse("no shadow replicas for primary shard with shadow settings", IndexService.useShadowEngine(true, shadowSettings));
assertTrue("shadow replicas for replica shards with shadow settings",IndexService.useShadowEngine(false, shadowSettings));
}
public static CompressedXContent filter(QueryBuilder filterBuilder) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
filterBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS);

View File

@ -1,907 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertOrderedSearchHits;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/**
* Tests for indices that use shadow replicas and a shared filesystem
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
@LuceneTestCase.AwaitsFix(bugUrl = "shadow replicas will be removed in master - https://github.com/elastic/elasticsearch/pull/23906")
public class IndexWithShadowReplicasIT extends ESIntegTestCase {
private Settings nodeSettings(Path dataPath) {
return nodeSettings(dataPath.toString());
}
private Settings nodeSettings(String dataPath) {
return Settings.builder()
.put(NodeEnvironment.ADD_NODE_LOCK_ID_TO_CUSTOM_PATH.getKey(), false)
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), dataPath)
.put(FsDirectoryService.INDEX_LOCK_FACTOR_SETTING.getKey(), randomFrom("native", "simple"))
.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}
public void testCannotCreateWithBadPath() throws Exception {
Settings nodeSettings = nodeSettings("/badpath");
internalCluster().startNodes(1, nodeSettings);
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, "/etc/foo")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).build();
try {
assertAcked(prepareCreate("foo").setSettings(idxSettings));
fail("should have failed");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage(),
e.getMessage().contains("custom path [/etc/foo] is not a sub-path of path.shared_data"));
}
}
/**
* Tests the case where we create an index without shadow replicas, snapshot it and then restore into
* an index with shadow replicas enabled.
*/
public void testRestoreToShadow() throws ExecutionException, InterruptedException {
final Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
internalCluster().startNodes(3, nodeSettings);
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).build();
assertAcked(prepareCreate("foo").setSettings(idxSettings));
ensureGreen();
final int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("foo", "doc", ""+i).setSource("foo", "bar").get();
}
assertNoFailures(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())));
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("foo").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
Settings shadowSettings = Settings.builder()
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2).build();
logger.info("--> restore the index into shadow replica index");
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
.setIndexSettings(shadowSettings).setWaitForCompletion(true)
.setRenamePattern("(.+)").setRenameReplacement("$1-copy")
.execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
refresh();
Index index = resolveIndex("foo-copy");
for (IndicesService service : internalCluster().getDataNodeInstances(IndicesService.class)) {
if (service.hasIndex(index)) {
IndexShard shard = service.indexServiceSafe(index).getShardOrNull(0);
if (shard.routingEntry().primary()) {
assertFalse(shard instanceof ShadowIndexShard);
} else {
assertTrue(shard instanceof ShadowIndexShard);
}
}
}
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch("foo-copy").setQuery(matchAllQuery()).get();
assertHitCount(resp, numDocs);
}
@TestLogging("org.elasticsearch.gateway:TRACE")
public void testIndexWithFewDocuments() throws Exception {
final Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
internalCluster().startNodes(3, nodeSettings);
final String IDX = "test";
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB))
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
ensureGreen(IDX);
// So basically, the primary should fail and the replica will need to
// replay the translog, this is what this tests
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(IDX).clear().setTranslog(true).get();
assertEquals(2, indicesStatsResponse.getIndex(IDX).getPrimaries().getTranslog().estimatedNumberOfOperations());
assertEquals(2, indicesStatsResponse.getIndex(IDX).getTotal().getTranslog().estimatedNumberOfOperations());
Index index = resolveIndex(IDX);
for (IndicesService service : internalCluster().getInstances(IndicesService.class)) {
IndexService indexService = service.indexService(index);
if (indexService != null) {
IndexShard shard = indexService.getShard(0);
TranslogStats translogStats = shard.translogStats();
assertTrue(translogStats != null || shard instanceof ShadowIndexShard);
if (translogStats != null) {
assertEquals(2, translogStats.estimatedNumberOfOperations());
}
}
}
// Check that we can get doc 1 and 2, because we are doing realtime
// gets and getting from the primary
GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").get();
GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").get();
assertThat(gResp1.getSource().get("foo"), equalTo("bar"));
assertThat(gResp2.getSource().get("foo"), equalTo("bar"));
flushAndRefresh(IDX);
client().prepareIndex(IDX, "doc", "3").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "4").setSource("foo", "bar").get();
refresh();
// Check that we can get doc 1 and 2 without realtime
gResp1 = client().prepareGet(IDX, "doc", "1").setRealtime(false).get();
gResp2 = client().prepareGet(IDX, "doc", "2").setRealtime(false).get();
assertThat(gResp1.getSource().get("foo"), equalTo("bar"));
assertThat(gResp2.getSource().get("foo"), equalTo("bar"));
logger.info("--> restarting all nodes");
if (randomBoolean()) {
logger.info("--> rolling restart");
internalCluster().rollingRestart();
} else {
logger.info("--> full restart");
internalCluster().fullRestart();
}
client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
ensureGreen(IDX);
flushAndRefresh(IDX);
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, 4);
logger.info("--> deleting index");
assertAcked(client().admin().indices().prepareDelete(IDX));
}
public void testReplicaToPrimaryPromotion() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
String node1 = internalCluster().startNode(nodeSettings);
String IDX = "test";
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").get();
GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.isExists());
assertThat(gResp1.getSource().get("foo"), equalTo("bar"));
assertThat(gResp2.getSource().get("foo"), equalTo("bar"));
// Node1 has the primary, now node2 has the replica
internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
flushAndRefresh(IDX);
logger.info("--> stopping node1 [{}]", node1);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1));
ensureClusterSizeConsistency(); // wait for the new node to be elected and process the node leave
ensureYellow(IDX);
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, 2);
gResp1 = client().prepareGet(IDX, "doc", "1").get();
gResp2 = client().prepareGet(IDX, "doc", "2").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.toString(), gResp2.isExists());
assertThat(gResp1.getSource().get("foo"), equalTo("bar"));
assertThat(gResp2.getSource().get("foo"), equalTo("bar"));
client().prepareIndex(IDX, "doc", "1").setSource("foo", "foobar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "foobar").get();
gResp1 = client().prepareGet(IDX, "doc", "1").get();
gResp2 = client().prepareGet(IDX, "doc", "2").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.toString(), gResp2.isExists());
assertThat(gResp1.getSource().get("foo"), equalTo("foobar"));
assertThat(gResp2.getSource().get("foo"), equalTo("foobar"));
}
public void testPrimaryRelocation() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
String node1 = internalCluster().startNode(nodeSettings);
String IDX = "test";
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").get();
GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.isExists());
assertThat(gResp1.getSource().get("foo"), equalTo("bar"));
assertThat(gResp2.getSource().get("foo"), equalTo("bar"));
// Node1 has the primary, now node2 has the replica
String node2 = internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
flushAndRefresh(IDX);
// now prevent primary from being allocated on node 1 move to node_3
String node3 = internalCluster().startNode(nodeSettings);
Settings build = Settings.builder().put("index.routing.allocation.exclude._name", node1).build();
client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet();
ensureGreen(IDX);
// check if primary has relocated to node3
assertEquals(internalCluster().clusterService(node3).localNode().getId(),
client().admin().cluster().prepareState().get().getState().routingTable().index(IDX).shard(0).primaryShard().currentNodeId());
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, 2);
gResp1 = client().prepareGet(IDX, "doc", "1").get();
gResp2 = client().prepareGet(IDX, "doc", "2").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.toString(), gResp2.isExists());
assertThat(gResp1.getSource().get("foo"), equalTo("bar"));
assertThat(gResp2.getSource().get("foo"), equalTo("bar"));
client().prepareIndex(IDX, "doc", "3").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "4").setSource("foo", "bar").get();
gResp1 = client().prepareGet(IDX, "doc", "3").setPreference("_primary").get();
gResp2 = client().prepareGet(IDX, "doc", "4").setPreference("_primary").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.isExists());
assertThat(gResp1.getSource().get("foo"), equalTo("bar"));
assertThat(gResp2.getSource().get("foo"), equalTo("bar"));
}
public void testPrimaryRelocationWithConcurrentIndexing() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
String node1 = internalCluster().startNode(nodeSettings);
final String IDX = "test";
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
// Node1 has the primary, now node2 has the replica
String node2 = internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
flushAndRefresh(IDX);
String node3 = internalCluster().startNode(nodeSettings);
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch started = new CountDownLatch(1);
final int numPhase1Docs = scaledRandomIntBetween(25, 200);
final int numPhase2Docs = scaledRandomIntBetween(25, 200);
final CountDownLatch phase1finished = new CountDownLatch(1);
final CountDownLatch phase2finished = new CountDownLatch(1);
final CopyOnWriteArrayList<Exception> exceptions = new CopyOnWriteArrayList<>();
Thread thread = new Thread() {
@Override
public void run() {
started.countDown();
while (counter.get() < (numPhase1Docs + numPhase2Docs)) {
try {
final IndexResponse indexResponse = client().prepareIndex(IDX, "doc",
Integer.toString(counter.incrementAndGet())).setSource("foo", "bar").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
} catch (Exception e) {
exceptions.add(e);
}
final int docCount = counter.get();
if (docCount == numPhase1Docs) {
phase1finished.countDown();
}
}
logger.info("--> stopping indexing thread");
phase2finished.countDown();
}
};
thread.start();
started.await();
phase1finished.await(); // wait for a certain number of documents to be indexed
logger.info("--> excluding {} from allocation", node1);
// now prevent primary from being allocated on node 1 move to node_3
Settings build = Settings.builder().put("index.routing.allocation.exclude._name", node1).build();
client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet();
// wait for more documents to be indexed post-recovery, also waits for
// indexing thread to stop
phase2finished.await();
ExceptionsHelper.rethrowAndSuppress(exceptions);
ensureGreen(IDX);
thread.join();
logger.info("--> performing query");
flushAndRefresh();
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, counter.get());
assertHitCount(resp, numPhase1Docs + numPhase2Docs);
}
public void testPrimaryRelocationWhereRecoveryFails() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = Settings.builder()
.put("node.add_lock_id_to_custom_path", false)
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), dataPath)
.build();
String node1 = internalCluster().startNode(nodeSettings);
final String IDX = "test";
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
// Node1 has the primary, now node2 has the replica
String node2 = internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
flushAndRefresh(IDX);
String node3 = internalCluster().startNode(nodeSettings);
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch started = new CountDownLatch(1);
final int numPhase1Docs = scaledRandomIntBetween(25, 200);
final int numPhase2Docs = scaledRandomIntBetween(25, 200);
final int numPhase3Docs = scaledRandomIntBetween(25, 200);
final CountDownLatch phase1finished = new CountDownLatch(1);
final CountDownLatch phase2finished = new CountDownLatch(1);
final CountDownLatch phase3finished = new CountDownLatch(1);
final AtomicBoolean keepFailing = new AtomicBoolean(true);
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, node1));
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, node3),
new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (keepFailing.get() && action.equals(PeerRecoveryTargetService.Actions.TRANSLOG_OPS)) {
logger.info("--> failing translog ops");
throw new ElasticsearchException("failing on purpose");
}
super.sendRequest(connection, requestId, action, request, options);
}
});
Thread thread = new Thread() {
@Override
public void run() {
started.countDown();
while (counter.get() < (numPhase1Docs + numPhase2Docs + numPhase3Docs)) {
final IndexResponse indexResponse = client().prepareIndex(IDX, "doc",
Integer.toString(counter.incrementAndGet())).setSource("foo", "bar").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
final int docCount = counter.get();
if (docCount == numPhase1Docs) {
phase1finished.countDown();
} else if (docCount == (numPhase1Docs + numPhase2Docs)) {
phase2finished.countDown();
}
}
logger.info("--> stopping indexing thread");
phase3finished.countDown();
}
};
thread.start();
started.await();
phase1finished.await(); // wait for a certain number of documents to be indexed
logger.info("--> excluding {} from allocation", node1);
// now prevent primary from being allocated on node 1 move to node_3
Settings build = Settings.builder().put("index.routing.allocation.exclude._name", node1).build();
client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet();
// wait for more documents to be indexed post-recovery, also waits for
// indexing thread to stop
phase2finished.await();
// stop failing
keepFailing.set(false);
// wait for more docs to be indexed
phase3finished.await();
ensureGreen(IDX);
thread.join();
logger.info("--> performing query");
flushAndRefresh();
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, counter.get());
}
public void testIndexWithShadowReplicasCleansUp() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
final int nodeCount = randomIntBetween(2, 5);
logger.info("--> starting {} nodes", nodeCount);
final List<String> nodes = internalCluster().startNodes(nodeCount, nodeSettings);
final String IDX = "test";
final Tuple<Integer, Integer> numPrimariesAndReplicas = randomPrimariesAndReplicas(nodeCount);
final int numPrimaries = numPrimariesAndReplicas.v1();
final int numReplicas = numPrimariesAndReplicas.v2();
logger.info("--> creating index {} with {} primary shards and {} replicas", IDX, numPrimaries, numReplicas);
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numPrimaries)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numReplicas)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
ensureGreen(IDX);
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
flushAndRefresh(IDX);
GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").get();
GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").get();
assertThat(gResp1.getSource().get("foo"), equalTo("bar"));
assertThat(gResp2.getSource().get("foo"), equalTo("bar"));
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, 2);
logger.info("--> deleting index " + IDX);
assertAcked(client().admin().indices().prepareDelete(IDX));
assertAllIndicesRemovedAndDeletionCompleted(internalCluster().getInstances(IndicesService.class));
assertPathHasBeenCleared(dataPath);
//TODO: uncomment the test below when https://github.com/elastic/elasticsearch/issues/17695 is resolved.
//assertIndicesDirsDeleted(nodes);
}
/**
* Tests that shadow replicas can be "naturally" rebalanced and relocated
* around the cluster. By "naturally" I mean without using the reroute API
*/
// This test failed on CI when trying to assert that all the shard data has been deleted
// from the index path. It has not been reproduced locally. Despite the IndicesService
// deleting the index and hence, deleting all the shard data for the index, the test
// failure still showed some Lucene files in the data directory for that index. Not sure
// why that is, so turning on more logging here.
@TestLogging("org.elasticsearch.indices:TRACE,org.elasticsearch.env:TRACE,_root:DEBUG")
public void testShadowReplicaNaturalRelocation() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
final List<String> nodes = internalCluster().startNodes(2, nodeSettings);
String IDX = "test";
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
ensureGreen(IDX);
int docCount = randomIntBetween(10, 100);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
builders.add(client().prepareIndex(IDX, "doc", i + "").setSource("foo", "bar"));
}
indexRandom(true, true, true, builders);
flushAndRefresh(IDX);
// start a third node, with 5 shards each on the other nodes, they
// should relocate some to the third node
final String node3 = internalCluster().startNode(nodeSettings);
nodes.add(node3);
assertBusy(new Runnable() {
@Override
public void run() {
client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
RoutingNodes nodes = resp.getState().getRoutingNodes();
for (RoutingNode node : nodes) {
logger.info("--> node has {} shards (needs at least 2)", node.numberOfOwningShards());
assertThat("at least 2 shards on node", node.numberOfOwningShards(), greaterThanOrEqualTo(2));
}
}
});
ensureYellow(IDX);
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, docCount);
assertAcked(client().admin().indices().prepareDelete(IDX));
assertAllIndicesRemovedAndDeletionCompleted(internalCluster().getInstances(IndicesService.class));
assertPathHasBeenCleared(dataPath);
//TODO: uncomment the test below when https://github.com/elastic/elasticsearch/issues/17695 is resolved.
//assertIndicesDirsDeleted(nodes);
}
public void testShadowReplicasUsingFieldData() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
internalCluster().startNodes(3, nodeSettings);
String IDX = "test";
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=keyword").get();
ensureGreen(IDX);
client().prepareIndex(IDX, "doc", "1").setSource("foo", "foo").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "3").setSource("foo", "baz").get();
client().prepareIndex(IDX, "doc", "4").setSource("foo", "eggplant").get();
flushAndRefresh(IDX);
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).addDocValueField("foo").addSort("foo", SortOrder.ASC).get();
assertHitCount(resp, 4);
assertOrderedSearchHits(resp, "2", "3", "4", "1");
SearchHit[] hits = resp.getHits().getHits();
assertThat(hits[0].field("foo").getValue().toString(), equalTo("bar"));
assertThat(hits[1].field("foo").getValue().toString(), equalTo("baz"));
assertThat(hits[2].field("foo").getValue().toString(), equalTo("eggplant"));
assertThat(hits[3].field("foo").getValue().toString(), equalTo("foo"));
}
/** wait until none of the nodes have shards allocated on them */
private void assertNoShardsOn(final List<String> nodeList) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
RoutingNodes nodes = resp.getState().getRoutingNodes();
for (RoutingNode node : nodes) {
logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards());
if (nodeList.contains(node.node().getName())) {
assertThat("no shards on node", node.numberOfOwningShards(), equalTo(0));
}
}
}
}, 1, TimeUnit.MINUTES);
}
/** wait until the node has the specified number of shards allocated on it */
private void assertShardCountOn(final String nodeName, final int shardCount) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
RoutingNodes nodes = resp.getState().getRoutingNodes();
for (RoutingNode node : nodes) {
logger.info("--> node {} has {} shards", node.node().getName(), node.numberOfOwningShards());
if (nodeName.equals(node.node().getName())) {
assertThat(node.numberOfOwningShards(), equalTo(shardCount));
}
}
}
}, 1, TimeUnit.MINUTES);
}
public void testIndexOnSharedFSRecoversToAnyNode() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
Settings fooSettings = Settings.builder().put(nodeSettings).put("node.attr.affinity", "foo").build();
Settings barSettings = Settings.builder().put(nodeSettings).put("node.attr.affinity", "bar").build();
List<String> allNodes = internalCluster().startNodes(fooSettings, fooSettings, barSettings, barSettings);
List<String> fooNodes = allNodes.subList(0, 2);
List<String> barNodes = allNodes.subList(2, 4);
String IDX = "test";
Settings includeFoo = Settings.builder()
.put("index.routing.allocation.include.affinity", "foo")
.build();
Settings includeBar = Settings.builder()
.put("index.routing.allocation.include.affinity", "bar")
.build();
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true)
.put(includeFoo) // start with requiring the shards on "foo"
.build();
// only one node, so all primaries will end up on node1
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=keyword").get();
ensureGreen(IDX);
// Index some documents
client().prepareIndex(IDX, "doc", "1").setSource("foo", "foo").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "3").setSource("foo", "baz").get();
client().prepareIndex(IDX, "doc", "4").setSource("foo", "eggplant").get();
flushAndRefresh(IDX);
// put shards on "bar"
client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeBar).get();
// wait for the shards to move from "foo" nodes to "bar" nodes
assertNoShardsOn(fooNodes);
// put shards back on "foo"
client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeFoo).get();
// wait for the shards to move from "bar" nodes to "foo" nodes
assertNoShardsOn(barNodes);
// Stop a foo node
logger.info("--> stopping first 'foo' node");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get(0)));
// Ensure that the other foo node has all the shards now
assertShardCountOn(fooNodes.get(1), 5);
// Assert no shards on the "bar" nodes
assertNoShardsOn(barNodes);
// Stop the second "foo" node
logger.info("--> stopping second 'foo' node");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get(1)));
// The index should still be able to be allocated (on the "bar" nodes),
// all the "foo" nodes are gone
ensureGreen(IDX);
// Start another "foo" node and make sure the index moves back
logger.info("--> starting additional 'foo' node");
String newFooNode = internalCluster().startNode(fooSettings);
assertShardCountOn(newFooNode, 5);
assertNoShardsOn(barNodes);
}
public void testDeletingClosedIndexRemovesFiles() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath.getParent());
final int numNodes = randomIntBetween(2, 5);
logger.info("--> starting {} nodes", numNodes);
final List<String> nodes = internalCluster().startNodes(numNodes, nodeSettings);
final String IDX = "test";
final Tuple<Integer, Integer> numPrimariesAndReplicas = randomPrimariesAndReplicas(numNodes);
final int numPrimaries = numPrimariesAndReplicas.v1();
final int numReplicas = numPrimariesAndReplicas.v2();
logger.info("--> creating index {} with {} primary shards and {} replicas", IDX, numPrimaries, numReplicas);
assert numPrimaries > 0;
assert numReplicas >= 0;
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numPrimaries)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numReplicas)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
ensureGreen(IDX);
int docCount = randomIntBetween(10, 100);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
builders.add(client().prepareIndex(IDX, "doc", i + "").setSource("foo", "bar"));
}
indexRandom(true, true, true, builders);
flushAndRefresh(IDX);
logger.info("--> closing index {}", IDX);
client().admin().indices().prepareClose(IDX).get();
ensureGreen(IDX);
logger.info("--> deleting closed index");
client().admin().indices().prepareDelete(IDX).get();
assertAllIndicesRemovedAndDeletionCompleted(internalCluster().getInstances(IndicesService.class));
assertPathHasBeenCleared(dataPath);
assertIndicesDirsDeleted(nodes);
}
public void testNodeJoinsWithoutShadowReplicaConfigured() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = nodeSettings(dataPath);
internalCluster().startNodes(2, nodeSettings);
String IDX = "test";
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
flushAndRefresh(IDX);
internalCluster().startNodes(1);
ensureYellow(IDX);
final ClusterHealthResponse clusterHealth = client().admin().cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.execute()
.actionGet();
assertThat(clusterHealth.getNumberOfNodes(), equalTo(3));
// the new node is not configured for a shadow replica index, so no shards should have been assigned to it
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
}
private static void assertIndicesDirsDeleted(final List<String> nodes) throws IOException {
for (String node : nodes) {
final NodeEnvironment nodeEnv = internalCluster().getInstance(NodeEnvironment.class, node);
assertThat(nodeEnv.availableIndexFolders(), equalTo(Collections.emptySet()));
}
}
private static Tuple<Integer, Integer> randomPrimariesAndReplicas(final int numNodes) {
final int numPrimaries;
final int numReplicas;
if (randomBoolean()) {
// test with some nodes having no shards
numPrimaries = 1;
numReplicas = randomIntBetween(0, numNodes - 2);
} else {
// test with all nodes having at least one shard
numPrimaries = randomIntBetween(1, 5);
numReplicas = numNodes - 1;
}
return Tuple.tuple(numPrimaries, numReplicas);
}
}

View File

@ -410,7 +410,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, true, new ReplicasRef(), () -> null, logger, opType) {
}, new ReplicasRef(), () -> null, logger, opType) {
@Override
protected List<ShardRouting> getShards(ShardId shardId, ClusterState state) {
return replicationGroup.shardRoutings();

View File

@ -243,11 +243,6 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}, null, config);
}
@Override
public Engine newReadOnlyEngine(EngineConfig config) {
throw new UnsupportedOperationException();
}
};
} else {
return null;

View File

@ -111,15 +111,10 @@ public class ShardPathTests extends ESTestCase {
final Path customPath;
if (useCustomDataPath) {
final Path path = createTempDir();
final boolean includeNodeId = randomBoolean();
indexSettings = indexSettingsBuilder.put(IndexMetaData.SETTING_DATA_PATH, "custom").build();
nodeSettings = Settings.builder().put(Environment.PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath())
.put(NodeEnvironment.ADD_NODE_LOCK_ID_TO_CUSTOM_PATH.getKey(), includeNodeId).build();
if (includeNodeId) {
.build();
customPath = path.resolve("custom").resolve("0");
} else {
customPath = path.resolve("custom");
}
} else {
customPath = null;
indexSettings = indexSettingsBuilder.build();

View File

@ -39,33 +39,6 @@ import java.util.Arrays;
public class FsDirectoryServiceTests extends ESTestCase {
public void testHasSleepWrapperOnSharedFS() throws IOException {
Settings build = randomBoolean() ?
Settings.builder().put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true).build() :
Settings.builder().put(IndexMetaData.SETTING_SHADOW_REPLICAS, true).build();;
IndexSettings settings = IndexSettingsModule.newIndexSettings("foo", build);
IndexStore store = new IndexStore(settings);
Path tempDir = createTempDir().resolve(settings.getUUID()).resolve("0");
Files.createDirectories(tempDir);
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(settings.getIndex(), 0));
FsDirectoryService fsDirectoryService = new FsDirectoryService(settings, store, path);
Directory directory = fsDirectoryService.newDirectory();
assertTrue(directory.getClass().toString(), directory instanceof SleepingLockWrapper);
}
public void testHasNoSleepWrapperOnNormalFS() throws IOException {
Settings build = Settings.builder().put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "simplefs").build();
IndexSettings settings = IndexSettingsModule.newIndexSettings("foo", build);
IndexStore store = new IndexStore(settings);
Path tempDir = createTempDir().resolve(settings.getUUID()).resolve("0");
Files.createDirectories(tempDir);
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(settings.getIndex(), 0));
FsDirectoryService fsDirectoryService = new FsDirectoryService(settings, store, path);
Directory directory = fsDirectoryService.newDirectory();
assertFalse(directory instanceof SleepingLockWrapper);
assertTrue(directory instanceof SimpleFSDirectory);
}
public void testPreload() throws IOException {
doTestPreload();
doTestPreload("nvd", "dvd", "tim");

View File

@ -106,23 +106,6 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
return true;
}
public void testCanDeleteIndexContent() throws IOException {
final IndicesService indicesService = getIndicesService();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_DATA_PATH, "/foo/bar")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 3))
.build());
assertFalse("shard on shared filesystem", indicesService.canDeleteIndexContents(idxSettings.getIndex(), idxSettings));
final IndexMetaData.Builder newIndexMetaData = IndexMetaData.builder(idxSettings.getIndexMetaData());
newIndexMetaData.state(IndexMetaData.State.CLOSE);
idxSettings = IndexSettingsModule.newIndexSettings(newIndexMetaData.build());
assertTrue("shard on shared filesystem, but closed, so it should be deletable",
indicesService.canDeleteIndexContents(idxSettings.getIndex(), idxSettings));
}
public void testCanDeleteShardContent() {
IndicesService indicesService = getIndicesService();
IndexMetaData meta = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(

View File

@ -91,15 +91,6 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
terminate(threadPool);
}
/**
* needed due to random usage of {@link IndexMetaData#INDEX_SHADOW_REPLICAS_SETTING}. removed once
* shadow replicas are removed.
*/
@Override
protected boolean enableWarningsCheck() {
return false;
}
public void testRandomClusterStateUpdates() {
// we have an IndicesClusterStateService per node in the cluster
final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>();
@ -249,10 +240,6 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
Settings.Builder settingsBuilder = Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3))
.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2));
if (randomBoolean()) {
settingsBuilder.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true);
}
CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex(name));

View File

@ -40,11 +40,6 @@ index settings, aliases, mappings, and index templates.
* <<indices-analyze>>
* <<indices-templates>>
[float]
[[shadow-replicas]]
== Replica configurations
* <<indices-shadow-replicas>>
[float]
[[monitoring]]
== Monitoring:
@ -95,8 +90,6 @@ include::indices/analyze.asciidoc[]
include::indices/templates.asciidoc[]
include::indices/shadow-replicas.asciidoc[]
include::indices/stats.asciidoc[]
include::indices/segments.asciidoc[]

View File

@ -1,124 +0,0 @@
[[indices-shadow-replicas]]
== Shadow replica indices
deprecated[5.2.0, Shadow replicas don't see much usage and we are planning to remove them]
If you would like to use a shared filesystem, you can use the shadow replicas
settings to choose where on disk the data for an index should be kept, as well
as how Elasticsearch should replay operations on all the replica shards of an
index.
In order to fully utilize the `index.data_path` and `index.shadow_replicas`
settings, you need to allow Elasticsearch to use the same data directory for
multiple instances by setting `node.add_lock_id_to_custom_path` to false in
elasticsearch.yml:
[source,yaml]
--------------------------------------------------
node.add_lock_id_to_custom_path: false
--------------------------------------------------
You will also need to indicate to the security manager where the custom indices
will be, so that the correct permissions can be applied. You can do this by
setting the `path.shared_data` setting in elasticsearch.yml:
[source,yaml]
--------------------------------------------------
path.shared_data: /opt/data
--------------------------------------------------
This means that Elasticsearch can read and write to files in any subdirectory of
the `path.shared_data` setting.
You can then create an index with a custom data path, where each node will use
this path for the data:
[WARNING]
========================
Because shadow replicas do not index the document on replica shards, it's
possible for the replica's known mapping to be behind the index's known mapping
if the latest cluster state has not yet been processed on the node containing
the replica. Because of this, it is highly recommended to use pre-defined
mappings when using shadow replicas.
========================
[source,js]
--------------------------------------------------
PUT /my_index
{
"index" : {
"number_of_shards" : 1,
"number_of_replicas" : 4,
"data_path": "/opt/data/my_index",
"shadow_replicas": true
}
}
--------------------------------------------------
// CONSOLE
// TEST[skip:no way to configure path.shared_data for /opt/data]
[WARNING]
========================
In the above example, the "/opt/data/my_index" path is a shared filesystem that
must be available on every node in the Elasticsearch cluster. You must also
ensure that the Elasticsearch process has the correct permissions to read from
and write to the directory used in the `index.data_path` setting.
========================
The `data_path` does not have to contain the index name, in this case,
"my_index" was used but it could easily also have been "/opt/data/"
An index that has been created with the `index.shadow_replicas` setting set to
"true" will not replicate document operations to any of the replica shards,
instead, it will only continually refresh. Once segments are available on the
filesystem where the shadow replica resides (after an Elasticsearch "flush"), a
regular refresh (governed by the `index.refresh_interval`) can be used to make
the new data searchable.
NOTE: Since documents are only indexed on the primary shard, realtime GET
requests could fail to return a document if executed on the replica shard,
therefore, GET API requests automatically have the `?preference=_primary` flag
set if there is no preference flag already set.
In order to ensure the data is being synchronized in a fast enough manner, you
may need to tune the flush threshold for the index to a desired number. A flush
is needed to fsync segment files to disk, so they will be visible to all other
replica nodes. Users should test what flush threshold levels they are
comfortable with, as increased flushing can impact indexing performance.
The Elasticsearch cluster will still detect the loss of a primary shard, and
transform the replica into a primary in this situation. This transformation will
take slightly longer, since no `IndexWriter` is maintained for each shadow
replica.
Below is the list of settings that can be changed using the update
settings API:
`index.data_path` (string)::
Path to use for the index's data. Note that by default Elasticsearch will
append the node ordinal by default to the path to ensure multiple instances
of Elasticsearch on the same machine do not share a data directory.
`index.shadow_replicas`::
Boolean value indicating this index should use shadow replicas. Defaults to
`false`.
`index.shared_filesystem`::
Boolean value indicating this index uses a shared filesystem. Defaults to
the `true` if `index.shadow_replicas` is set to true, `false` otherwise.
`index.shared_filesystem.recover_on_any_node`::
Boolean value indicating whether the primary shards for the index should be
allowed to recover on any node in the cluster. If a node holding a copy of
the shard is found, recovery prefers that node. Defaults to `false`.
=== Node level settings related to shadow replicas
These are non-dynamic settings that need to be configured in `elasticsearch.yml`
`node.add_lock_id_to_custom_path`::
Boolean setting indicating whether Elasticsearch should append the node's
ordinal to the custom data path. For example, if this is enabled and a path
of "/tmp/foo" is used, the first locally-running node will use "/tmp/foo/0",
the second will use "/tmp/foo/1", the third "/tmp/foo/2", etc. Defaults to
`true`.

View File

@ -29,6 +29,11 @@ PUT _template/template_2
// CONSOLE
=== Shadow Replicas are deprecated
=== Shadow Replicas have been removed
<<indices-shadow-replicas,Shadow Replicas>> don't see much usage and we are planning to remove them.
Shadow replicas don't see enough usage, and have been removed. This includes the
following settings:
- `index.shared_filesystem`
- `index.shadow_replicas`
- `node.add_lock_id_to_custom_path`

View File

@ -169,33 +169,6 @@
$body: |
/^(index2 \s+ \d \s+ (p|r) \s+ ((STARTED|INITIALIZING|RELOCATING) \s+ (\d \s+ (\d+|\d+[.]\d+)(kb|b) \s+)? \d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} \s+ .+|UNASSIGNED \s+) \n?){5}$/
---
"Test cat shards with shadow replicas":
- skip:
version: " - 5.1.99"
reason: deprecation was added in 5.2.0
features: "warnings"
- do:
indices.create:
index: index3
body:
settings:
number_of_shards: "1"
number_of_replicas: "1"
shadow_replicas: true
shared_filesystem: false
warnings:
- "[index.shadow_replicas] setting was deprecated in Elasticsearch and will be removed in a future release! See the breaking changes documentation for the next major version."
- "[index.shared_filesystem] setting was deprecated in Elasticsearch and will be removed in a future release! See the breaking changes documentation for the next major version."
- do:
cat.shards:
index: index3
- match:
$body: |
/^(index3 \s+ \d \s+ (p|s) \s+ ((STARTED|INITIALIZING|RELOCATING) \s+ (\d \s+ (\d+|\d+[.]\d+)(kb|b) \s+)? \d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} \s+ .+|UNASSIGNED \s+) \n?){2}$/
---
"Test cat shards using wildcards":

View File

@ -35,9 +35,4 @@ public final class MockEngineFactory implements EngineFactory {
public Engine newReadWriteEngine(EngineConfig config) {
return new MockInternalEngine(config, wrapper);
}
@Override
public Engine newReadOnlyEngine(EngineConfig config) {
return new MockShadowEngine(config, wrapper);
}
}

View File

@ -47,7 +47,7 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Support class to build MockEngines like {@link org.elasticsearch.test.engine.MockInternalEngine} or {@link org.elasticsearch.test.engine.MockShadowEngine}
* Support class to build MockEngines like {@link org.elasticsearch.test.engine.MockInternalEngine}
* since they need to subclass the actual engine
*/
public final class MockEngineSupport {

View File

@ -29,12 +29,10 @@ import java.io.IOException;
final class MockInternalEngine extends InternalEngine {
private MockEngineSupport support;
private final boolean randomizeFlushOnClose;
private Class<? extends FilterDirectoryReader> wrapperClass;
MockInternalEngine(EngineConfig config, Class<? extends FilterDirectoryReader> wrapper) throws EngineException {
super(config);
randomizeFlushOnClose = config.getIndexSettings().isOnSharedFilesystem() == false;
wrapperClass = wrapper;
}
@ -61,7 +59,6 @@ final class MockInternalEngine extends InternalEngine {
@Override
public void flushAndClose() throws IOException {
if (randomizeFlushOnClose) {
switch (support().flushOrClose(MockEngineSupport.CloseAction.FLUSH_AND_CLOSE)) {
case FLUSH_AND_CLOSE:
flushAndCloseInternal();
@ -70,9 +67,6 @@ final class MockInternalEngine extends InternalEngine {
super.close();
break;
}
} else {
flushAndCloseInternal();
}
}
private void flushAndCloseInternal() throws IOException {

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.engine;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.ShadowEngine;
final class MockShadowEngine extends ShadowEngine {
private final MockEngineSupport support;
MockShadowEngine(EngineConfig config, Class<? extends FilterDirectoryReader> wrapper) {
super(config);
this.support = new MockEngineSupport(config, wrapper);
}
@Override
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
final Searcher engineSearcher = super.newSearcher(source, searcher, manager);
return support.wrapSearcher(source, engineSearcher, searcher, manager);
}
}

View File

@ -103,7 +103,7 @@ public class MockFSIndexStore extends IndexStore {
@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
if (currentState == IndexShardState.CLOSED && validCheckIndexStates.contains(previousState) && indexShard.indexSettings().isOnSharedFilesystem() == false) {
if (currentState == IndexShardState.CLOSED && validCheckIndexStates.contains(previousState)) {
shardSet.put(indexShard, Boolean.TRUE);
}