Improvements to the IndicesService class
This commit contains the following improvements/fixes: 1. Renaming method names and variables to better reflect the purpose of the method and the semantics of the variable. 2. For deleting indexes, replace the closed parameter passed to the delete index/store methods with obtaining the index's state from the IndexSettings that is already passed in. 3. Added tests to the IndexWithShadowReplicaIT suite, some of which show issues in the shadow replica delete process that are captured in Github issue 17695. Closes #17638
This commit is contained in:
parent
c9cb1de6cb
commit
b87fd54ba9
|
@ -1151,7 +1151,6 @@
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]IndicesLifecycleListenerIT.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]IndicesLifecycleListenerIT.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]IndicesLifecycleListenerSingleNodeTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]IndicesLifecycleListenerSingleNodeTests.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]IndicesOptionsIntegrationIT.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]IndicesOptionsIntegrationIT.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]IndicesServiceTests.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analysis[/\\]PreBuiltAnalyzerIntegrationIT.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analysis[/\\]PreBuiltAnalyzerIntegrationIT.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analyze[/\\]AnalyzeActionIT.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analyze[/\\]AnalyzeActionIT.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analyze[/\\]HunspellServiceIT.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analyze[/\\]HunspellServiceIT.java" checks="LineLength" />
|
||||||
|
|
|
@ -29,22 +29,11 @@ import java.io.Reader;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.nio.charset.CharsetDecoder;
|
import java.nio.charset.CharsetDecoder;
|
||||||
import java.nio.file.DirectoryNotEmptyException;
|
|
||||||
import java.nio.file.DirectoryStream;
|
import java.nio.file.DirectoryStream;
|
||||||
import java.nio.file.FileAlreadyExistsException;
|
|
||||||
import java.nio.file.FileVisitResult;
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.SimpleFileVisitor;
|
|
||||||
import java.nio.file.StandardCopyOption;
|
|
||||||
import java.nio.file.attribute.BasicFileAttributes;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.stream.StreamSupport;
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
import static java.nio.file.FileVisitResult.CONTINUE;
|
|
||||||
import static java.nio.file.FileVisitResult.SKIP_SUBTREE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Elasticsearch utils to work with {@link java.nio.file.Path}
|
* Elasticsearch utils to work with {@link java.nio.file.Path}
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -190,8 +190,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||||
});
|
});
|
||||||
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
|
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
|
||||||
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
|
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -459,7 +457,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||||
final IndexSettings indexSettings = indexService.getIndexSettings();
|
final IndexSettings indexSettings = indexService.getIndexSettings();
|
||||||
listener.afterIndexDeleted(indexService.index(), indexSettings.getSettings());
|
listener.afterIndexDeleted(indexService.index(), indexSettings.getSettings());
|
||||||
// now we are done - try to wipe data on disk if possible
|
// now we are done - try to wipe data on disk if possible
|
||||||
deleteIndexStore(reason, indexService.index(), indexSettings, false);
|
deleteIndexStore(reason, indexService.index(), indexSettings);
|
||||||
}
|
}
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
throw new ElasticsearchException("failed to remove index " + index, ex);
|
throw new ElasticsearchException("failed to remove index " + index, ex);
|
||||||
|
@ -515,17 +513,21 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||||
removeIndex(index, reason, true);
|
removeIndex(index, reason, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteClosedIndex(String reason, IndexMetaData metaData, ClusterState clusterState) {
|
/**
|
||||||
|
* Deletes an index that is not assigned to this node. This method cleans up all disk folders relating to the index
|
||||||
|
* but does not deal with in-memory structures. For those call {@link #deleteIndex(Index, String)}
|
||||||
|
*/
|
||||||
|
public void deleteUnassignedIndex(String reason, IndexMetaData metaData, ClusterState clusterState) {
|
||||||
if (nodeEnv.hasNodeFile()) {
|
if (nodeEnv.hasNodeFile()) {
|
||||||
String indexName = metaData.getIndex().getName();
|
String indexName = metaData.getIndex().getName();
|
||||||
try {
|
try {
|
||||||
if (clusterState.metaData().hasIndex(indexName)) {
|
if (clusterState.metaData().hasIndex(indexName)) {
|
||||||
final IndexMetaData index = clusterState.metaData().index(indexName);
|
final IndexMetaData index = clusterState.metaData().index(indexName);
|
||||||
throw new IllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getIndexUUID() + "] [" + metaData.getIndexUUID() + "]");
|
throw new IllegalStateException("Can't delete unassigned index store for [" + indexName + "] - it's still part of the cluster state [" + index.getIndexUUID() + "] [" + metaData.getIndexUUID() + "]");
|
||||||
}
|
}
|
||||||
deleteIndexStore(reason, metaData, clusterState, true);
|
deleteIndexStore(reason, metaData, clusterState);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("[{}] failed to delete closed index", e, metaData.getIndex());
|
logger.warn("[{}] failed to delete unassigned index (reason [{}])", e, metaData.getIndex(), reason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -533,8 +535,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||||
/**
|
/**
|
||||||
* Deletes the index store trying to acquire all shards locks for this index.
|
* Deletes the index store trying to acquire all shards locks for this index.
|
||||||
* This method will delete the metadata for the index even if the actual shards can't be locked.
|
* This method will delete the metadata for the index even if the actual shards can't be locked.
|
||||||
|
*
|
||||||
|
* Package private for testing
|
||||||
*/
|
*/
|
||||||
public void deleteIndexStore(String reason, IndexMetaData metaData, ClusterState clusterState, boolean closed) throws IOException {
|
void deleteIndexStore(String reason, IndexMetaData metaData, ClusterState clusterState) throws IOException {
|
||||||
if (nodeEnv.hasNodeFile()) {
|
if (nodeEnv.hasNodeFile()) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
Index index = metaData.getIndex();
|
Index index = metaData.getIndex();
|
||||||
|
@ -547,22 +551,25 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||||
// we do not delete the store if it is a master eligible node and the index is still in the cluster state
|
// we do not delete the store if it is a master eligible node and the index is still in the cluster state
|
||||||
// because we want to keep the meta data for indices around even if no shards are left here
|
// because we want to keep the meta data for indices around even if no shards are left here
|
||||||
final IndexMetaData idxMeta = clusterState.metaData().index(index.getName());
|
final IndexMetaData idxMeta = clusterState.metaData().index(index.getName());
|
||||||
throw new IllegalStateException("Can't delete closed index store for [" + index.getName() + "] - it's still part of the cluster state [" + idxMeta.getIndexUUID() + "] [" + metaData.getIndexUUID() + "]");
|
throw new IllegalStateException("Can't delete index store for [" + index.getName() + "] - it's still part of the " +
|
||||||
|
"cluster state [" + idxMeta.getIndexUUID() + "] [" + metaData.getIndexUUID() + "], " +
|
||||||
|
"we are master eligible, so will keep the index metadata even if no shards are left.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final IndexSettings indexSettings = buildIndexSettings(metaData);
|
final IndexSettings indexSettings = buildIndexSettings(metaData);
|
||||||
deleteIndexStore(reason, indexSettings.getIndex(), indexSettings, closed);
|
deleteIndexStore(reason, indexSettings.getIndex(), indexSettings);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteIndexStore(String reason, Index index, IndexSettings indexSettings, boolean closed) throws IOException {
|
private void deleteIndexStore(String reason, Index index, IndexSettings indexSettings) throws IOException {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
// we are trying to delete the index store here - not a big deal if the lock can't be obtained
|
// we are trying to delete the index store here - not a big deal if the lock can't be obtained
|
||||||
// the store metadata gets wiped anyway even without the lock this is just best effort since
|
// the store metadata gets wiped anyway even without the lock this is just best effort since
|
||||||
// every shards deletes its content under the shard lock it owns.
|
// every shards deletes its content under the shard lock it owns.
|
||||||
logger.debug("{} deleting index store reason [{}]", index, reason);
|
logger.debug("{} deleting index store reason [{}]", index, reason);
|
||||||
if (canDeleteIndexContents(index, indexSettings, closed)) {
|
if (canDeleteIndexContents(index, indexSettings)) {
|
||||||
|
// its safe to delete all index metadata and shard data
|
||||||
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
|
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
|
||||||
}
|
}
|
||||||
success = true;
|
success = true;
|
||||||
|
@ -617,11 +624,11 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||||
logger.debug("{} deleted shard reason [{}]", shardId, reason);
|
logger.debug("{} deleted shard reason [{}]", shardId, reason);
|
||||||
|
|
||||||
if (clusterState.nodes().getLocalNode().isMasterNode() == false && // master nodes keep the index meta data, even if having no shards..
|
if (clusterState.nodes().getLocalNode().isMasterNode() == false && // master nodes keep the index meta data, even if having no shards..
|
||||||
canDeleteIndexContents(shardId.getIndex(), indexSettings, false)) {
|
canDeleteIndexContents(shardId.getIndex(), indexSettings)) {
|
||||||
if (nodeEnv.findAllShardIds(shardId.getIndex()).isEmpty()) {
|
if (nodeEnv.findAllShardIds(shardId.getIndex()).isEmpty()) {
|
||||||
try {
|
try {
|
||||||
// note that deleteIndexStore have more safety checks and may throw an exception if index was concurrently created.
|
// note that deleteIndexStore have more safety checks and may throw an exception if index was concurrently created.
|
||||||
deleteIndexStore("no longer used", metaData, clusterState, false);
|
deleteIndexStore("no longer used", metaData, clusterState);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// wrap the exception to indicate we already deleted the shard
|
// wrap the exception to indicate we already deleted the shard
|
||||||
throw new ElasticsearchException("failed to delete unused index after deleting its last shard (" + shardId + ")", e);
|
throw new ElasticsearchException("failed to delete unused index after deleting its last shard (" + shardId + ")", e);
|
||||||
|
@ -633,18 +640,19 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method returns true if the current node is allowed to delete the
|
* This method returns true if the current node is allowed to delete the given index.
|
||||||
* given index. If the index uses a shared filesystem this method always
|
* This is the case if the index is deleted in the metadata or there is no allocation
|
||||||
* returns false.
|
* on the local node and the index isn't on a shared file system.
|
||||||
* @param index {@code Index} to check whether deletion is allowed
|
* @param index {@code Index} to check whether deletion is allowed
|
||||||
* @param indexSettings {@code IndexSettings} for the given index
|
* @param indexSettings {@code IndexSettings} for the given index
|
||||||
* @return true if the index can be deleted on this node
|
* @return true if the index can be deleted on this node
|
||||||
*/
|
*/
|
||||||
public boolean canDeleteIndexContents(Index index, IndexSettings indexSettings, boolean closed) {
|
public boolean canDeleteIndexContents(Index index, IndexSettings indexSettings) {
|
||||||
|
// index contents can be deleted if the index is not on a shared file system,
|
||||||
|
// or if its on a shared file system but its an already closed index (so all
|
||||||
|
// its resources have already been relinquished)
|
||||||
|
if (indexSettings.isOnSharedFilesystem() == false || indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE) {
|
||||||
final IndexService indexService = indexService(index);
|
final IndexService indexService = indexService(index);
|
||||||
// Closed indices may be deleted, even if they are on a shared
|
|
||||||
// filesystem. Since it is closed we aren't deleting it for relocation
|
|
||||||
if (indexSettings.isOnSharedFilesystem() == false || closed) {
|
|
||||||
if (indexService == null && nodeEnv.hasNodeFile()) {
|
if (indexService == null && nodeEnv.hasNodeFile()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -236,7 +236,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
} else {
|
} else {
|
||||||
final IndexMetaData metaData = previousState.metaData().getIndexSafe(index);
|
final IndexMetaData metaData = previousState.metaData().getIndexSafe(index);
|
||||||
indexSettings = new IndexSettings(metaData, settings);
|
indexSettings = new IndexSettings(metaData, settings);
|
||||||
indicesService.deleteClosedIndex("closed index no longer part of the metadata", metaData, event.state());
|
indicesService.deleteUnassignedIndex("closed index no longer part of the metadata", metaData, event.state());
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, indexSettings, localNodeId);
|
nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, indexSettings, localNodeId);
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
|
@ -29,16 +30,21 @@ import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||||
|
import org.elasticsearch.common.Priority;
|
||||||
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.ShadowIndexShard;
|
import org.elasticsearch.index.shard.ShadowIndexShard;
|
||||||
|
import org.elasticsearch.index.store.FsDirectoryService;
|
||||||
import org.elasticsearch.index.translog.TranslogStats;
|
import org.elasticsearch.index.translog.TranslogStats;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryTargetService;
|
import org.elasticsearch.indices.recovery.RecoveryTargetService;
|
||||||
|
@ -59,6 +65,7 @@ import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -87,9 +94,9 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
||||||
|
|
||||||
private Settings nodeSettings(String dataPath) {
|
private Settings nodeSettings(String dataPath) {
|
||||||
return Settings.builder()
|
return Settings.builder()
|
||||||
.put("node.add_id_to_custom_path", false)
|
.put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH.getKey(), false)
|
||||||
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), dataPath)
|
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), dataPath)
|
||||||
.put("index.store.fs.fs_lock", randomFrom("native", "simple"))
|
.put(FsDirectoryService.INDEX_LOCK_FACTOR_SETTING.getKey(), randomFrom("native", "simple"))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -543,13 +550,18 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
||||||
Path dataPath = createTempDir();
|
Path dataPath = createTempDir();
|
||||||
Settings nodeSettings = nodeSettings(dataPath);
|
Settings nodeSettings = nodeSettings(dataPath);
|
||||||
|
|
||||||
int nodeCount = randomIntBetween(2, 5);
|
final int nodeCount = randomIntBetween(2, 5);
|
||||||
internalCluster().startNodesAsync(nodeCount, nodeSettings).get();
|
logger.info("--> starting {} nodes", nodeCount);
|
||||||
String IDX = "test";
|
final List<String> nodes = internalCluster().startNodesAsync(nodeCount, nodeSettings).get();
|
||||||
|
final String IDX = "test";
|
||||||
|
final Tuple<Integer, Integer> numPrimariesAndReplicas = randomPrimariesAndReplicas(nodeCount);
|
||||||
|
final int numPrimaries = numPrimariesAndReplicas.v1();
|
||||||
|
final int numReplicas = numPrimariesAndReplicas.v2();
|
||||||
|
logger.info("--> creating index {} with {} primary shards and {} replicas", IDX, numPrimaries, numReplicas);
|
||||||
|
|
||||||
Settings idxSettings = Settings.builder()
|
Settings idxSettings = Settings.builder()
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numPrimaries)
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, nodeCount - 1))
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numReplicas)
|
||||||
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
|
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
|
||||||
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
|
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
|
||||||
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
||||||
|
@ -557,6 +569,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
||||||
|
|
||||||
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
|
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
|
||||||
ensureGreen(IDX);
|
ensureGreen(IDX);
|
||||||
|
|
||||||
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
|
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
|
||||||
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
|
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
|
||||||
flushAndRefresh(IDX);
|
flushAndRefresh(IDX);
|
||||||
|
@ -570,9 +583,13 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
||||||
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
|
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
|
||||||
assertHitCount(resp, 2);
|
assertHitCount(resp, 2);
|
||||||
|
|
||||||
|
logger.info("--> deleting index " + IDX);
|
||||||
assertAcked(client().admin().indices().prepareDelete(IDX));
|
assertAcked(client().admin().indices().prepareDelete(IDX));
|
||||||
|
|
||||||
assertPathHasBeenCleared(dataPath);
|
assertPathHasBeenCleared(dataPath);
|
||||||
|
//norelease
|
||||||
|
//TODO: uncomment the test below when https://github.com/elastic/elasticsearch/issues/17695 is resolved.
|
||||||
|
//assertIndicesDirsDeleted(nodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -583,7 +600,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
||||||
Path dataPath = createTempDir();
|
Path dataPath = createTempDir();
|
||||||
Settings nodeSettings = nodeSettings(dataPath);
|
Settings nodeSettings = nodeSettings(dataPath);
|
||||||
|
|
||||||
internalCluster().startNodesAsync(2, nodeSettings).get();
|
final List<String> nodes = internalCluster().startNodesAsync(2, nodeSettings).get();
|
||||||
String IDX = "test";
|
String IDX = "test";
|
||||||
|
|
||||||
Settings idxSettings = Settings.builder()
|
Settings idxSettings = Settings.builder()
|
||||||
|
@ -608,6 +625,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
||||||
// start a third node, with 5 shards each on the other nodes, they
|
// start a third node, with 5 shards each on the other nodes, they
|
||||||
// should relocate some to the third node
|
// should relocate some to the third node
|
||||||
final String node3 = internalCluster().startNode(nodeSettings);
|
final String node3 = internalCluster().startNode(nodeSettings);
|
||||||
|
nodes.add(node3);
|
||||||
|
|
||||||
assertBusy(new Runnable() {
|
assertBusy(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -630,6 +648,9 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
||||||
assertAcked(client().admin().indices().prepareDelete(IDX));
|
assertAcked(client().admin().indices().prepareDelete(IDX));
|
||||||
|
|
||||||
assertPathHasBeenCleared(dataPath);
|
assertPathHasBeenCleared(dataPath);
|
||||||
|
//norelease
|
||||||
|
//TODO: uncomment the test below when https://github.com/elastic/elasticsearch/issues/17695 is resolved.
|
||||||
|
//assertIndicesDirsDeleted(nodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testShadowReplicasUsingFieldData() throws Exception {
|
public void testShadowReplicasUsingFieldData() throws Exception {
|
||||||
|
@ -779,49 +800,104 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
||||||
|
|
||||||
public void testDeletingClosedIndexRemovesFiles() throws Exception {
|
public void testDeletingClosedIndexRemovesFiles() throws Exception {
|
||||||
Path dataPath = createTempDir();
|
Path dataPath = createTempDir();
|
||||||
Path dataPath2 = createTempDir();
|
|
||||||
Settings nodeSettings = nodeSettings(dataPath.getParent());
|
Settings nodeSettings = nodeSettings(dataPath.getParent());
|
||||||
|
|
||||||
internalCluster().startNodesAsync(2, nodeSettings).get();
|
final int numNodes = randomIntBetween(2, 5);
|
||||||
String IDX = "test";
|
logger.info("--> starting {} nodes", numNodes);
|
||||||
String IDX2 = "test2";
|
final List<String> nodes = internalCluster().startNodesAsync(numNodes, nodeSettings).get();
|
||||||
|
final String IDX = "test";
|
||||||
|
final Tuple<Integer, Integer> numPrimariesAndReplicas = randomPrimariesAndReplicas(numNodes);
|
||||||
|
final int numPrimaries = numPrimariesAndReplicas.v1();
|
||||||
|
final int numReplicas = numPrimariesAndReplicas.v2();
|
||||||
|
logger.info("--> creating index {} with {} primary shards and {} replicas", IDX, numPrimaries, numReplicas);
|
||||||
|
|
||||||
|
assert numPrimaries > 0;
|
||||||
|
assert numReplicas >= 0;
|
||||||
Settings idxSettings = Settings.builder()
|
Settings idxSettings = Settings.builder()
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numPrimaries)
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numReplicas)
|
||||||
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
|
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
|
||||||
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
|
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
|
||||||
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
||||||
.build();
|
.build();
|
||||||
Settings idx2Settings = Settings.builder()
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
|
||||||
.put(IndexMetaData.SETTING_DATA_PATH, dataPath2.toAbsolutePath().toString())
|
|
||||||
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
|
|
||||||
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
|
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
|
||||||
prepareCreate(IDX2).setSettings(idx2Settings).addMapping("doc", "foo", "type=text").get();
|
ensureGreen(IDX);
|
||||||
ensureGreen(IDX, IDX2);
|
|
||||||
|
|
||||||
int docCount = randomIntBetween(10, 100);
|
int docCount = randomIntBetween(10, 100);
|
||||||
List<IndexRequestBuilder> builders = new ArrayList<>();
|
List<IndexRequestBuilder> builders = new ArrayList<>();
|
||||||
for (int i = 0; i < docCount; i++) {
|
for (int i = 0; i < docCount; i++) {
|
||||||
builders.add(client().prepareIndex(IDX, "doc", i + "").setSource("foo", "bar"));
|
builders.add(client().prepareIndex(IDX, "doc", i + "").setSource("foo", "bar"));
|
||||||
builders.add(client().prepareIndex(IDX2, "doc", i + "").setSource("foo", "bar"));
|
|
||||||
}
|
}
|
||||||
indexRandom(true, true, true, builders);
|
indexRandom(true, true, true, builders);
|
||||||
flushAndRefresh(IDX, IDX2);
|
flushAndRefresh(IDX);
|
||||||
|
|
||||||
logger.info("--> closing index {}", IDX);
|
logger.info("--> closing index {}", IDX);
|
||||||
client().admin().indices().prepareClose(IDX).get();
|
client().admin().indices().prepareClose(IDX).get();
|
||||||
|
ensureGreen(IDX);
|
||||||
|
|
||||||
logger.info("--> deleting non-closed index");
|
|
||||||
client().admin().indices().prepareDelete(IDX2).get();
|
|
||||||
assertPathHasBeenCleared(dataPath2);
|
|
||||||
logger.info("--> deleting closed index");
|
logger.info("--> deleting closed index");
|
||||||
client().admin().indices().prepareDelete(IDX).get();
|
client().admin().indices().prepareDelete(IDX).get();
|
||||||
|
|
||||||
assertPathHasBeenCleared(dataPath);
|
assertPathHasBeenCleared(dataPath);
|
||||||
|
assertIndicesDirsDeleted(nodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testNodeJoinsWithoutShadowReplicaConfigured() throws Exception {
|
||||||
|
Path dataPath = createTempDir();
|
||||||
|
Settings nodeSettings = nodeSettings(dataPath);
|
||||||
|
|
||||||
|
internalCluster().startNodesAsync(2, nodeSettings).get();
|
||||||
|
String IDX = "test";
|
||||||
|
|
||||||
|
Settings idxSettings = Settings.builder()
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||||
|
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
|
||||||
|
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
|
||||||
|
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=text").get();
|
||||||
|
ensureYellow(IDX);
|
||||||
|
|
||||||
|
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
|
||||||
|
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
|
||||||
|
flushAndRefresh(IDX);
|
||||||
|
|
||||||
|
internalCluster().startNodesAsync(1).get();
|
||||||
|
ensureYellow(IDX);
|
||||||
|
|
||||||
|
final ClusterHealthResponse clusterHealth = client().admin().cluster()
|
||||||
|
.prepareHealth()
|
||||||
|
.setWaitForEvents(Priority.LANGUID)
|
||||||
|
.execute()
|
||||||
|
.actionGet();
|
||||||
|
assertThat(clusterHealth.getNumberOfNodes(), equalTo(3));
|
||||||
|
// the new node is not configured for a shadow replica index, so no shards should have been assigned to it
|
||||||
|
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertIndicesDirsDeleted(final List<String> nodes) throws IOException {
|
||||||
|
for (String node : nodes) {
|
||||||
|
final NodeEnvironment nodeEnv = internalCluster().getInstance(NodeEnvironment.class, node);
|
||||||
|
assertThat(nodeEnv.availableIndexFolders(), equalTo(Collections.emptySet()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Tuple<Integer, Integer> randomPrimariesAndReplicas(final int numNodes) {
|
||||||
|
final int numPrimaries;
|
||||||
|
final int numReplicas;
|
||||||
|
if (randomBoolean()) {
|
||||||
|
// test with some nodes having no shards
|
||||||
|
numPrimaries = 1;
|
||||||
|
numReplicas = randomIntBetween(0, numNodes - 2);
|
||||||
|
} else {
|
||||||
|
// test with all nodes having at least one shard
|
||||||
|
numPrimaries = randomIntBetween(1, 5);
|
||||||
|
numReplicas = numNodes - 1;
|
||||||
|
}
|
||||||
|
return Tuple.tuple(numPrimaries, numReplicas);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,8 +63,13 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4))
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4))
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 3))
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 3))
|
||||||
.build());
|
.build());
|
||||||
assertFalse("shard on shared filesystem", indicesService.canDeleteIndexContents(idxSettings.getIndex(), idxSettings, false));
|
assertFalse("shard on shared filesystem", indicesService.canDeleteIndexContents(idxSettings.getIndex(), idxSettings));
|
||||||
assertTrue("shard on shared filesystem and closed", indicesService.canDeleteIndexContents(idxSettings.getIndex(), idxSettings, true));
|
|
||||||
|
final IndexMetaData.Builder newIndexMetaData = IndexMetaData.builder(idxSettings.getIndexMetaData());
|
||||||
|
newIndexMetaData.state(IndexMetaData.State.CLOSE);
|
||||||
|
idxSettings = IndexSettingsModule.newIndexSettings(newIndexMetaData.build());
|
||||||
|
assertTrue("shard on shared filesystem, but closed, so it should be deletable",
|
||||||
|
indicesService.canDeleteIndexContents(idxSettings.getIndex(), idxSettings));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCanDeleteShardContent() {
|
public void testCanDeleteShardContent() {
|
||||||
|
@ -81,7 +86,8 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
||||||
test.removeShard(0, "boom");
|
test.removeShard(0, "boom");
|
||||||
assertTrue("shard is removed", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()));
|
assertTrue("shard is removed", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()));
|
||||||
ShardId notAllocated = new ShardId(test.index(), 100);
|
ShardId notAllocated = new ShardId(test.index(), 100);
|
||||||
assertFalse("shard that was never on this node should NOT be deletable", indicesService.canDeleteShardContent(notAllocated, test.getIndexSettings()));
|
assertFalse("shard that was never on this node should NOT be deletable",
|
||||||
|
indicesService.canDeleteShardContent(notAllocated, test.getIndexSettings()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDeleteIndexStore() throws Exception {
|
public void testDeleteIndexStore() throws Exception {
|
||||||
|
@ -92,7 +98,7 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
||||||
assertTrue(test.hasShard(0));
|
assertTrue(test.hasShard(0));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
indicesService.deleteIndexStore("boom", firstMetaData, clusterService.state(), false);
|
indicesService.deleteIndexStore("boom", firstMetaData, clusterService.state());
|
||||||
fail();
|
fail();
|
||||||
} catch (IllegalStateException ex) {
|
} catch (IllegalStateException ex) {
|
||||||
// all good
|
// all good
|
||||||
|
@ -119,7 +125,7 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
||||||
assertTrue(path.exists());
|
assertTrue(path.exists());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
indicesService.deleteIndexStore("boom", secondMetaData, clusterService.state(), false);
|
indicesService.deleteIndexStore("boom", secondMetaData, clusterService.state());
|
||||||
fail();
|
fail();
|
||||||
} catch (IllegalStateException ex) {
|
} catch (IllegalStateException ex) {
|
||||||
// all good
|
// all good
|
||||||
|
@ -129,7 +135,7 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
// now delete the old one and make sure we resolve against the name
|
// now delete the old one and make sure we resolve against the name
|
||||||
try {
|
try {
|
||||||
indicesService.deleteIndexStore("boom", firstMetaData, clusterService.state(), false);
|
indicesService.deleteIndexStore("boom", firstMetaData, clusterService.state());
|
||||||
fail();
|
fail();
|
||||||
} catch (IllegalStateException ex) {
|
} catch (IllegalStateException ex) {
|
||||||
// all good
|
// all good
|
||||||
|
@ -187,4 +193,5 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
||||||
assertAcked(client().admin().indices().prepareOpen("test"));
|
assertAcked(client().admin().indices().prepareOpen("test"));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,4 +65,13 @@ public class IndexSettingsModule extends AbstractModule {
|
||||||
}
|
}
|
||||||
return new IndexSettings(metaData, Settings.EMPTY, (idx) -> Regex.simpleMatch(idx, metaData.getIndex().getName()), new IndexScopedSettings(Settings.EMPTY, settingSet));
|
return new IndexSettings(metaData, Settings.EMPTY, (idx) -> Regex.simpleMatch(idx, metaData.getIndex().getName()), new IndexScopedSettings(Settings.EMPTY, settingSet));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static IndexSettings newIndexSettings(final IndexMetaData indexMetaData, Setting<?>... setting) {
|
||||||
|
Set<Setting<?>> settingSet = new HashSet<>(IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
|
||||||
|
if (setting.length > 0) {
|
||||||
|
settingSet.addAll(Arrays.asList(setting));
|
||||||
|
}
|
||||||
|
return new IndexSettings(indexMetaData, Settings.EMPTY, (idx) -> Regex.simpleMatch(idx, indexMetaData.getIndex().getName()),
|
||||||
|
new IndexScopedSettings(Settings.EMPTY, settingSet));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue