use index uuid as folder name to decouple index folder name from index name
This commit is contained in:
parent
3daa83b2d2
commit
2b18a3ce1d
|
@ -70,7 +70,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.unmodifiableSet;
|
||||
|
||||
|
@ -89,7 +88,7 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
* not running on Linux, or we hit an exception trying), True means the device possibly spins and False means it does not. */
|
||||
public final Boolean spins;
|
||||
|
||||
public NodePath(Path path, Environment environment) throws IOException {
|
||||
public NodePath(Path path) throws IOException {
|
||||
this.path = path;
|
||||
this.indicesPath = path.resolve(INDICES_FOLDER);
|
||||
this.fileStore = Environment.getFileStore(path);
|
||||
|
@ -102,16 +101,18 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
|
||||
/**
|
||||
* Resolves the given shards directory against this NodePath
|
||||
* ${data.paths}/nodes/{node.id}/indices/{index.uuid}/{shard.id}
|
||||
*/
|
||||
public Path resolve(ShardId shardId) {
|
||||
return resolve(shardId.getIndex()).resolve(Integer.toString(shardId.id()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the given indexes directory against this NodePath
|
||||
* Resolves index directory against this NodePath
|
||||
* ${data.paths}/nodes/{node.id}/indices/{index.uuid}
|
||||
*/
|
||||
public Path resolve(Index index) {
|
||||
return indicesPath.resolve(index.getName());
|
||||
return indicesPath.resolve(index.getUUID());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -131,7 +132,7 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
|
||||
private final int localNodeId;
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private final Map<ShardLockKey, InternalShardLock> shardLocks = new HashMap<>();
|
||||
private final Map<ShardId, InternalShardLock> shardLocks = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Maximum number of data nodes that should run in an environment.
|
||||
|
@ -186,7 +187,7 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
|
||||
try {
|
||||
locks[dirIndex] = luceneDir.obtainLock(NODE_LOCK_FILENAME);
|
||||
nodePaths[dirIndex] = new NodePath(dir, environment);
|
||||
nodePaths[dirIndex] = new NodePath(dir);
|
||||
localNodeId = possibleLockId;
|
||||
} catch (LockObtainFailedException ex) {
|
||||
logger.trace("failed to obtain node lock on {}", dir.toAbsolutePath());
|
||||
|
@ -445,11 +446,11 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
* @param indexSettings settings for the index being deleted
|
||||
*/
|
||||
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException {
|
||||
final Path[] indexPaths = indexPaths(index.getName());
|
||||
final Path[] indexPaths = indexPaths(index);
|
||||
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
|
||||
IOUtils.rm(indexPaths);
|
||||
if (indexSettings.hasCustomDataPath()) {
|
||||
Path customLocation = resolveCustomLocation(indexSettings, index.getName());
|
||||
Path customLocation = resolveIndexCustomLocation(indexSettings);
|
||||
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
|
||||
IOUtils.rm(customLocation);
|
||||
}
|
||||
|
@ -517,17 +518,16 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
*/
|
||||
public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws IOException {
|
||||
logger.trace("acquiring node shardlock on [{}], timeout [{}]", shardId, lockTimeoutMS);
|
||||
final ShardLockKey shardLockKey = new ShardLockKey(shardId);
|
||||
final InternalShardLock shardLock;
|
||||
final boolean acquired;
|
||||
synchronized (shardLocks) {
|
||||
if (shardLocks.containsKey(shardLockKey)) {
|
||||
shardLock = shardLocks.get(shardLockKey);
|
||||
if (shardLocks.containsKey(shardId)) {
|
||||
shardLock = shardLocks.get(shardId);
|
||||
shardLock.incWaitCount();
|
||||
acquired = false;
|
||||
} else {
|
||||
shardLock = new InternalShardLock(shardLockKey);
|
||||
shardLocks.put(shardLockKey, shardLock);
|
||||
shardLock = new InternalShardLock(shardId);
|
||||
shardLocks.put(shardId, shardLock);
|
||||
acquired = true;
|
||||
}
|
||||
}
|
||||
|
@ -547,7 +547,7 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
@Override
|
||||
protected void closeInternal() {
|
||||
shardLock.release();
|
||||
logger.trace("released shard lock for [{}]", shardLockKey);
|
||||
logger.trace("released shard lock for [{}]", shardId);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -559,51 +559,7 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
*/
|
||||
public Set<ShardId> lockedShards() {
|
||||
synchronized (shardLocks) {
|
||||
Set<ShardId> lockedShards = shardLocks.keySet().stream()
|
||||
.map(shardLockKey -> new ShardId(new Index(shardLockKey.indexName, "_na_"), shardLockKey.shardId)).collect(Collectors.toSet());
|
||||
return unmodifiableSet(lockedShards);
|
||||
}
|
||||
}
|
||||
|
||||
// a key for the shard lock. we can't use shardIds, because the contain
|
||||
// the index uuid, but we want the lock semantics to the same as we map indices to disk folders, i.e., without the uuid (for now).
|
||||
private final class ShardLockKey {
|
||||
final String indexName;
|
||||
final int shardId;
|
||||
|
||||
public ShardLockKey(final ShardId shardId) {
|
||||
this.indexName = shardId.getIndexName();
|
||||
this.shardId = shardId.id();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + indexName + "][" + shardId + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ShardLockKey that = (ShardLockKey) o;
|
||||
|
||||
if (shardId != that.shardId) {
|
||||
return false;
|
||||
}
|
||||
return indexName.equals(that.indexName);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = indexName.hashCode();
|
||||
result = 31 * result + shardId;
|
||||
return result;
|
||||
return unmodifiableSet(new HashSet<>(shardLocks.keySet()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -616,10 +572,10 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
*/
|
||||
private final Semaphore mutex = new Semaphore(1);
|
||||
private int waitCount = 1; // guarded by shardLocks
|
||||
private final ShardLockKey lockKey;
|
||||
private final ShardId shardId;
|
||||
|
||||
InternalShardLock(ShardLockKey id) {
|
||||
lockKey = id;
|
||||
InternalShardLock(ShardId shardId) {
|
||||
this.shardId = shardId;
|
||||
mutex.acquireUninterruptibly();
|
||||
}
|
||||
|
||||
|
@ -639,10 +595,10 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
synchronized (shardLocks) {
|
||||
assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0";
|
||||
--waitCount;
|
||||
logger.trace("shard lock wait count for [{}] is now [{}]", lockKey, waitCount);
|
||||
logger.trace("shard lock wait count for {} is now [{}]", shardId, waitCount);
|
||||
if (waitCount == 0) {
|
||||
logger.trace("last shard lock wait decremented, removing lock for [{}]", lockKey);
|
||||
InternalShardLock remove = shardLocks.remove(lockKey);
|
||||
logger.trace("last shard lock wait decremented, removing lock for {}", shardId);
|
||||
InternalShardLock remove = shardLocks.remove(shardId);
|
||||
assert remove != null : "Removed lock was null";
|
||||
}
|
||||
}
|
||||
|
@ -651,11 +607,11 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
void acquire(long timeoutInMillis) throws LockObtainFailedException{
|
||||
try {
|
||||
if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) {
|
||||
throw new LockObtainFailedException("Can't lock shard " + lockKey + ", timed out after " + timeoutInMillis + "ms");
|
||||
throw new LockObtainFailedException("Can't lock shard " + shardId + ", timed out after " + timeoutInMillis + "ms");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new LockObtainFailedException("Can't lock shard " + lockKey + ", interrupted", e);
|
||||
throw new LockObtainFailedException("Can't lock shard " + shardId + ", interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -698,11 +654,11 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
/**
|
||||
* Returns all index paths.
|
||||
*/
|
||||
public Path[] indexPaths(String indexName) {
|
||||
public Path[] indexPaths(Index index) {
|
||||
assert assertEnvIsLocked();
|
||||
Path[] indexPaths = new Path[nodePaths.length];
|
||||
for (int i = 0; i < nodePaths.length; i++) {
|
||||
indexPaths[i] = nodePaths[i].indicesPath.resolve(indexName);
|
||||
indexPaths[i] = nodePaths[i].resolve(index);
|
||||
}
|
||||
return indexPaths;
|
||||
}
|
||||
|
@ -725,25 +681,47 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
return shardLocations;
|
||||
}
|
||||
|
||||
public Set<String> findAllIndices() throws IOException {
|
||||
/**
|
||||
* Returns all folder names in ${data.paths}/nodes/{node.id}/indices folder
|
||||
*/
|
||||
public Set<String> availableIndexFolders() throws IOException {
|
||||
if (nodePaths == null || locks == null) {
|
||||
throw new IllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
assert assertEnvIsLocked();
|
||||
Set<String> indices = new HashSet<>();
|
||||
Set<String> indexFolders = new HashSet<>();
|
||||
for (NodePath nodePath : nodePaths) {
|
||||
Path indicesLocation = nodePath.indicesPath;
|
||||
if (Files.isDirectory(indicesLocation)) {
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indicesLocation)) {
|
||||
for (Path index : stream) {
|
||||
if (Files.isDirectory(index)) {
|
||||
indices.add(index.getFileName().toString());
|
||||
indexFolders.add(index.getFileName().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return indices;
|
||||
return indexFolders;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves all existing paths to <code>indexFolderName</code> in ${data.paths}/nodes/{node.id}/indices
|
||||
*/
|
||||
public Path[] resolveIndexFolder(String indexFolderName) throws IOException {
|
||||
if (nodePaths == null || locks == null) {
|
||||
throw new IllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
assert assertEnvIsLocked();
|
||||
List<Path> paths = new ArrayList<>(nodePaths.length);
|
||||
for (NodePath nodePath : nodePaths) {
|
||||
Path indexFolder = nodePath.indicesPath.resolve(indexFolderName);
|
||||
if (Files.exists(indexFolder)) {
|
||||
paths.add(indexFolder);
|
||||
}
|
||||
}
|
||||
return paths.toArray(new Path[paths.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -761,13 +739,13 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
}
|
||||
assert assertEnvIsLocked();
|
||||
final Set<ShardId> shardIds = new HashSet<>();
|
||||
String indexName = index.getName();
|
||||
final String indexUniquePathId = index.getUUID();
|
||||
for (final NodePath nodePath : nodePaths) {
|
||||
Path location = nodePath.indicesPath;
|
||||
if (Files.isDirectory(location)) {
|
||||
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(location)) {
|
||||
for (Path indexPath : indexStream) {
|
||||
if (indexName.equals(indexPath.getFileName().toString())) {
|
||||
if (indexUniquePathId.equals(indexPath.getFileName().toString())) {
|
||||
shardIds.addAll(findAllShardsForIndex(indexPath, index));
|
||||
}
|
||||
}
|
||||
|
@ -778,7 +756,7 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
}
|
||||
|
||||
private static Set<ShardId> findAllShardsForIndex(Path indexPath, Index index) throws IOException {
|
||||
assert indexPath.getFileName().toString().equals(index.getName());
|
||||
assert indexPath.getFileName().toString().equals(index.getUUID());
|
||||
Set<ShardId> shardIds = new HashSet<>();
|
||||
if (Files.isDirectory(indexPath)) {
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
|
||||
|
@ -861,7 +839,7 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
*
|
||||
* @param indexSettings settings for the index
|
||||
*/
|
||||
private Path resolveCustomLocation(IndexSettings indexSettings) {
|
||||
public Path resolveBaseCustomLocation(IndexSettings indexSettings) {
|
||||
String customDataDir = indexSettings.customDataPath();
|
||||
if (customDataDir != null) {
|
||||
// This assert is because this should be caught by MetaDataCreateIndexService
|
||||
|
@ -882,10 +860,9 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
* the root path for the index.
|
||||
*
|
||||
* @param indexSettings settings for the index
|
||||
* @param indexName index to resolve the path for
|
||||
*/
|
||||
private Path resolveCustomLocation(IndexSettings indexSettings, final String indexName) {
|
||||
return resolveCustomLocation(indexSettings).resolve(indexName);
|
||||
private Path resolveIndexCustomLocation(IndexSettings indexSettings) {
|
||||
return resolveBaseCustomLocation(indexSettings).resolve(indexSettings.getUUID());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -897,7 +874,7 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
* @param shardId shard to resolve the path to
|
||||
*/
|
||||
public Path resolveCustomLocation(IndexSettings indexSettings, final ShardId shardId) {
|
||||
return resolveCustomLocation(indexSettings, shardId.getIndexName()).resolve(Integer.toString(shardId.id()));
|
||||
return resolveIndexCustomLocation(indexSettings).resolve(Integer.toString(shardId.id()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -921,22 +898,24 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
|
|||
for (Path path : nodeDataPaths()) { // check node-paths are writable
|
||||
tryWriteTempFile(path);
|
||||
}
|
||||
for (String index : this.findAllIndices()) {
|
||||
for (Path path : this.indexPaths(index)) { // check index paths are writable
|
||||
Path statePath = path.resolve(MetaDataStateFormat.STATE_DIR_NAME);
|
||||
tryWriteTempFile(statePath);
|
||||
tryWriteTempFile(path);
|
||||
}
|
||||
for (ShardId shardID : this.findAllShardIds(new Index(index, IndexMetaData.INDEX_UUID_NA_VALUE))) {
|
||||
Path[] paths = this.availableShardPaths(shardID);
|
||||
for (Path path : paths) { // check shard paths are writable
|
||||
Path indexDir = path.resolve(ShardPath.INDEX_FOLDER_NAME);
|
||||
Path statePath = path.resolve(MetaDataStateFormat.STATE_DIR_NAME);
|
||||
Path translogDir = path.resolve(ShardPath.TRANSLOG_FOLDER_NAME);
|
||||
tryWriteTempFile(indexDir);
|
||||
tryWriteTempFile(translogDir);
|
||||
tryWriteTempFile(statePath);
|
||||
tryWriteTempFile(path);
|
||||
for (String indexFolderName : this.availableIndexFolders()) {
|
||||
for (Path indexPath : this.resolveIndexFolder(indexFolderName)) { // check index paths are writable
|
||||
Path indexStatePath = indexPath.resolve(MetaDataStateFormat.STATE_DIR_NAME);
|
||||
tryWriteTempFile(indexStatePath);
|
||||
tryWriteTempFile(indexPath);
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
|
||||
for (Path shardPath : stream) {
|
||||
String fileName = shardPath.getFileName().toString();
|
||||
if (Files.isDirectory(shardPath) && fileName.chars().allMatch(Character::isDigit)) {
|
||||
Path indexDir = shardPath.resolve(ShardPath.INDEX_FOLDER_NAME);
|
||||
Path statePath = shardPath.resolve(MetaDataStateFormat.STATE_DIR_NAME);
|
||||
Path translogDir = shardPath.resolve(ShardPath.TRANSLOG_FOLDER_NAME);
|
||||
tryWriteTempFile(indexDir);
|
||||
tryWriteTempFile(translogDir);
|
||||
tryWriteTempFile(statePath);
|
||||
tryWriteTempFile(shardPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.gateway;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -26,12 +27,17 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
@ -47,7 +53,7 @@ public class DanglingIndicesState extends AbstractComponent {
|
|||
private final MetaStateService metaStateService;
|
||||
private final LocalAllocateDangledIndices allocateDangledIndices;
|
||||
|
||||
private final Map<String, IndexMetaData> danglingIndices = ConcurrentCollections.newConcurrentMap();
|
||||
private final Map<Index, IndexMetaData> danglingIndices = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
@Inject
|
||||
public DanglingIndicesState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
|
||||
|
@ -74,7 +80,7 @@ public class DanglingIndicesState extends AbstractComponent {
|
|||
/**
|
||||
* The current set of dangling indices.
|
||||
*/
|
||||
Map<String, IndexMetaData> getDanglingIndices() {
|
||||
Map<Index, IndexMetaData> getDanglingIndices() {
|
||||
// This might be a good use case for CopyOnWriteHashMap
|
||||
return unmodifiableMap(new HashMap<>(danglingIndices));
|
||||
}
|
||||
|
@ -83,10 +89,16 @@ public class DanglingIndicesState extends AbstractComponent {
|
|||
* Cleans dangling indices if they are already allocated on the provided meta data.
|
||||
*/
|
||||
void cleanupAllocatedDangledIndices(MetaData metaData) {
|
||||
for (String danglingIndex : danglingIndices.keySet()) {
|
||||
if (metaData.hasIndex(danglingIndex)) {
|
||||
logger.debug("[{}] no longer dangling (created), removing from dangling list", danglingIndex);
|
||||
danglingIndices.remove(danglingIndex);
|
||||
for (Index index : danglingIndices.keySet()) {
|
||||
final IndexMetaData indexMetaData = metaData.index(index);
|
||||
if (indexMetaData != null && indexMetaData.getIndex().getName().equals(index.getName())) {
|
||||
if (indexMetaData.getIndex().getUUID().equals(index.getUUID()) == false) {
|
||||
logger.warn("[{}] can not be imported as a dangling index, as there is already another index " +
|
||||
"with the same name but a different uuid. local index will be ignored (but not deleted)", index);
|
||||
} else {
|
||||
logger.debug("[{}] no longer dangling (created), removing from dangling list", index);
|
||||
}
|
||||
danglingIndices.remove(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -104,36 +116,30 @@ public class DanglingIndicesState extends AbstractComponent {
|
|||
* that have state on disk, but are not part of the provided meta data, or not detected
|
||||
* as dangled already.
|
||||
*/
|
||||
Map<String, IndexMetaData> findNewDanglingIndices(MetaData metaData) {
|
||||
final Set<String> indices;
|
||||
Map<Index, IndexMetaData> findNewDanglingIndices(MetaData metaData) {
|
||||
final Set<String> excludeIndexPathIds = new HashSet<>(metaData.indices().size() + danglingIndices.size());
|
||||
for (ObjectCursor<IndexMetaData> cursor : metaData.indices().values()) {
|
||||
excludeIndexPathIds.add(cursor.value.getIndex().getUUID());
|
||||
}
|
||||
excludeIndexPathIds.addAll(danglingIndices.keySet().stream().map(Index::getUUID).collect(Collectors.toList()));
|
||||
try {
|
||||
indices = nodeEnv.findAllIndices();
|
||||
} catch (Throwable e) {
|
||||
final List<IndexMetaData> indexMetaDataList = metaStateService.loadIndicesStates(excludeIndexPathIds::contains);
|
||||
Map<Index, IndexMetaData> newIndices = new HashMap<>(indexMetaDataList.size());
|
||||
for (IndexMetaData indexMetaData : indexMetaDataList) {
|
||||
if (metaData.hasIndex(indexMetaData.getIndex().getName())) {
|
||||
logger.warn("[{}] can not be imported as a dangling index, as index with same name already exists in cluster metadata",
|
||||
indexMetaData.getIndex());
|
||||
} else {
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state",
|
||||
indexMetaData.getIndex());
|
||||
newIndices.put(indexMetaData.getIndex(), indexMetaData);
|
||||
}
|
||||
}
|
||||
return newIndices;
|
||||
} catch (IOException e) {
|
||||
logger.warn("failed to list dangling indices", e);
|
||||
return emptyMap();
|
||||
}
|
||||
|
||||
Map<String, IndexMetaData> newIndices = new HashMap<>();
|
||||
for (String indexName : indices) {
|
||||
if (metaData.hasIndex(indexName) == false && danglingIndices.containsKey(indexName) == false) {
|
||||
try {
|
||||
IndexMetaData indexMetaData = metaStateService.loadIndexState(indexName);
|
||||
if (indexMetaData != null) {
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state", indexName);
|
||||
if (!indexMetaData.getIndex().getName().equals(indexName)) {
|
||||
logger.info("dangled index directory name is [{}], state name is [{}], renaming to directory name", indexName, indexMetaData.getIndex());
|
||||
indexMetaData = IndexMetaData.builder(indexMetaData).index(indexName).build();
|
||||
}
|
||||
newIndices.put(indexName, indexMetaData);
|
||||
} else {
|
||||
logger.debug("[{}] dangling index directory detected, but no state found", indexName);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to load index state for detected dangled index", t, indexName);
|
||||
}
|
||||
}
|
||||
}
|
||||
return newIndices;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.IndexFolderUpgrader;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
|
@ -86,6 +87,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
|||
try {
|
||||
ensureNoPre019State();
|
||||
pre20Upgrade();
|
||||
IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv);
|
||||
long startNS = System.nanoTime();
|
||||
metaStateService.loadFullState();
|
||||
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
|
||||
|
@ -130,7 +132,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
|||
for (IndexMetaData indexMetaData : newMetaData) {
|
||||
IndexMetaData indexMetaDataOnDisk = null;
|
||||
if (indexMetaData.getState().equals(IndexMetaData.State.CLOSE)) {
|
||||
indexMetaDataOnDisk = metaStateService.loadIndexState(indexMetaData.getIndex().getName());
|
||||
indexMetaDataOnDisk = metaStateService.loadIndexState(indexMetaData.getIndex());
|
||||
}
|
||||
if (indexMetaDataOnDisk != null) {
|
||||
newPreviouslyWrittenIndices.add(indexMetaDataOnDisk.getIndex());
|
||||
|
@ -158,7 +160,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
|||
// check and write changes in indices
|
||||
for (IndexMetaWriteInfo indexMetaWrite : writeInfo) {
|
||||
try {
|
||||
metaStateService.writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData, indexMetaWrite.previousMetaData);
|
||||
metaStateService.writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData);
|
||||
} catch (Throwable e) {
|
||||
success = false;
|
||||
}
|
||||
|
@ -166,7 +168,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
|||
}
|
||||
|
||||
danglingIndicesState.processDanglingIndices(newMetaData);
|
||||
|
||||
if (success) {
|
||||
previousMetaData = newMetaData;
|
||||
previouslyWrittenIndices = unmodifiableSet(relevantIndices);
|
||||
|
@ -233,7 +234,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
|||
// We successfully checked all indices for backward compatibility and found no non-upgradable indices, which
|
||||
// means the upgrade can continue. Now it's safe to overwrite index metadata with the new version.
|
||||
for (IndexMetaData indexMetaData : updateIndexMetaData) {
|
||||
metaStateService.writeIndex("upgrade", indexMetaData, null);
|
||||
// since we still haven't upgraded the index folders, we write index state in the old folder
|
||||
metaStateService.writeIndex("upgrade", indexMetaData, nodeEnv.resolveIndexFolder(indexMetaData.getIndex().getName()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,9 +33,12 @@ import org.elasticsearch.env.NodeEnvironment;
|
|||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* Handles writing and loading both {@link MetaData} and {@link IndexMetaData}
|
||||
|
@ -45,7 +48,7 @@ public class MetaStateService extends AbstractComponent {
|
|||
static final String FORMAT_SETTING = "gateway.format";
|
||||
|
||||
static final String GLOBAL_STATE_FILE_PREFIX = "global-";
|
||||
private static final String INDEX_STATE_FILE_PREFIX = "state-";
|
||||
public static final String INDEX_STATE_FILE_PREFIX = "state-";
|
||||
|
||||
private final NodeEnvironment nodeEnv;
|
||||
|
||||
|
@ -91,14 +94,12 @@ public class MetaStateService extends AbstractComponent {
|
|||
} else {
|
||||
metaDataBuilder = MetaData.builder();
|
||||
}
|
||||
|
||||
final Set<String> indices = nodeEnv.findAllIndices();
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = loadIndexState(index);
|
||||
if (indexMetaData == null) {
|
||||
logger.debug("[{}] failed to find metadata for existing index location", index);
|
||||
} else {
|
||||
for (String indexFolderName : nodeEnv.availableIndexFolders()) {
|
||||
IndexMetaData indexMetaData = indexStateFormat.loadLatestState(logger, nodeEnv.resolveIndexFolder(indexFolderName));
|
||||
if (indexMetaData != null) {
|
||||
metaDataBuilder.put(indexMetaData, false);
|
||||
} else {
|
||||
logger.debug("[{}] failed to find metadata for existing index location", indexFolderName);
|
||||
}
|
||||
}
|
||||
return metaDataBuilder.build();
|
||||
|
@ -108,10 +109,35 @@ public class MetaStateService extends AbstractComponent {
|
|||
* Loads the index state for the provided index name, returning null if doesn't exists.
|
||||
*/
|
||||
@Nullable
|
||||
IndexMetaData loadIndexState(String index) throws IOException {
|
||||
IndexMetaData loadIndexState(Index index) throws IOException {
|
||||
return indexStateFormat.loadLatestState(logger, nodeEnv.indexPaths(index));
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads all indices states available on disk
|
||||
*/
|
||||
List<IndexMetaData> loadIndicesStates(Predicate<String> excludeIndexPathIdsPredicate) throws IOException {
|
||||
List<IndexMetaData> indexMetaDataList = new ArrayList<>();
|
||||
for (String indexFolderName : nodeEnv.availableIndexFolders()) {
|
||||
if (excludeIndexPathIdsPredicate.test(indexFolderName)) {
|
||||
continue;
|
||||
}
|
||||
IndexMetaData indexMetaData = indexStateFormat.loadLatestState(logger,
|
||||
nodeEnv.resolveIndexFolder(indexFolderName));
|
||||
if (indexMetaData != null) {
|
||||
final String indexPathId = indexMetaData.getIndex().getUUID();
|
||||
if (indexFolderName.equals(indexPathId)) {
|
||||
indexMetaDataList.add(indexMetaData);
|
||||
} else {
|
||||
throw new IllegalStateException("[" + indexFolderName+ "] invalid index folder name, rename to [" + indexPathId + "]");
|
||||
}
|
||||
} else {
|
||||
logger.debug("[{}] failed to find metadata for existing index location", indexFolderName);
|
||||
}
|
||||
}
|
||||
return indexMetaDataList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the global state, *without* index state, see {@link #loadFullState()} for that.
|
||||
*/
|
||||
|
@ -129,13 +155,22 @@ public class MetaStateService extends AbstractComponent {
|
|||
/**
|
||||
* Writes the index state.
|
||||
*/
|
||||
void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaData previousIndexMetaData) throws Exception {
|
||||
logger.trace("[{}] writing state, reason [{}]", indexMetaData.getIndex(), reason);
|
||||
void writeIndex(String reason, IndexMetaData indexMetaData) throws IOException {
|
||||
writeIndex(reason, indexMetaData, nodeEnv.indexPaths(indexMetaData.getIndex()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the index state in <code>locations</code>, use {@link #writeGlobalState(String, MetaData)}
|
||||
* to write index state in index paths
|
||||
*/
|
||||
void writeIndex(String reason, IndexMetaData indexMetaData, Path[] locations) throws IOException {
|
||||
final Index index = indexMetaData.getIndex();
|
||||
logger.trace("[{}] writing state, reason [{}]", index, reason);
|
||||
try {
|
||||
indexStateFormat.write(indexMetaData, indexMetaData.getVersion(), nodeEnv.indexPaths(indexMetaData.getIndex().getName()));
|
||||
indexStateFormat.write(indexMetaData, indexMetaData.getVersion(), locations);
|
||||
} catch (Throwable ex) {
|
||||
logger.warn("[{}]: failed to write index state", ex, indexMetaData.getIndex());
|
||||
throw new IOException("failed to write state for [" + indexMetaData.getIndex() + "]", ex);
|
||||
logger.warn("[{}]: failed to write index state", ex, index);
|
||||
throw new IOException("failed to write state for [" + index + "]", ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.io.IOException;
|
|||
import java.nio.file.FileStore;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public final class ShardPath {
|
||||
|
@ -37,22 +36,20 @@ public final class ShardPath {
|
|||
public static final String TRANSLOG_FOLDER_NAME = "translog";
|
||||
|
||||
private final Path path;
|
||||
private final String indexUUID;
|
||||
private final ShardId shardId;
|
||||
private final Path shardStatePath;
|
||||
private final boolean isCustomDataPath;
|
||||
|
||||
public ShardPath(boolean isCustomDataPath, Path dataPath, Path shardStatePath, String indexUUID, ShardId shardId) {
|
||||
public ShardPath(boolean isCustomDataPath, Path dataPath, Path shardStatePath, ShardId shardId) {
|
||||
assert dataPath.getFileName().toString().equals(Integer.toString(shardId.id())) : "dataPath must end with the shard ID but didn't: " + dataPath.toString();
|
||||
assert shardStatePath.getFileName().toString().equals(Integer.toString(shardId.id())) : "shardStatePath must end with the shard ID but didn't: " + dataPath.toString();
|
||||
assert dataPath.getParent().getFileName().toString().equals(shardId.getIndexName()) : "dataPath must end with index/shardID but didn't: " + dataPath.toString();
|
||||
assert shardStatePath.getParent().getFileName().toString().equals(shardId.getIndexName()) : "shardStatePath must end with index/shardID but didn't: " + dataPath.toString();
|
||||
assert dataPath.getParent().getFileName().toString().equals(shardId.getIndex().getUUID()) : "dataPath must end with index path id but didn't: " + dataPath.toString();
|
||||
assert shardStatePath.getParent().getFileName().toString().equals(shardId.getIndex().getUUID()) : "shardStatePath must end with index path id but didn't: " + dataPath.toString();
|
||||
if (isCustomDataPath && dataPath.equals(shardStatePath)) {
|
||||
throw new IllegalArgumentException("shard state path must be different to the data path when using custom data paths");
|
||||
}
|
||||
this.isCustomDataPath = isCustomDataPath;
|
||||
this.path = dataPath;
|
||||
this.indexUUID = indexUUID;
|
||||
this.shardId = shardId;
|
||||
this.shardStatePath = shardStatePath;
|
||||
}
|
||||
|
@ -73,10 +70,6 @@ public final class ShardPath {
|
|||
return Files.exists(path);
|
||||
}
|
||||
|
||||
public String getIndexUUID() {
|
||||
return indexUUID;
|
||||
}
|
||||
|
||||
public ShardId getShardId() {
|
||||
return shardId;
|
||||
}
|
||||
|
@ -144,7 +137,7 @@ public final class ShardPath {
|
|||
dataPath = statePath;
|
||||
}
|
||||
logger.debug("{} loaded data path [{}], state path [{}]", shardId, dataPath, statePath);
|
||||
return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, indexUUID, shardId);
|
||||
return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -168,34 +161,6 @@ public final class ShardPath {
|
|||
}
|
||||
}
|
||||
|
||||
/** Maps each path.data path to a "guess" of how many bytes the shards allocated to that path might additionally use over their
|
||||
* lifetime; we do this so a bunch of newly allocated shards won't just all go the path with the most free space at this moment. */
|
||||
private static Map<Path,Long> getEstimatedReservedBytes(NodeEnvironment env, long avgShardSizeInBytes, Iterable<IndexShard> shards) throws IOException {
|
||||
long totFreeSpace = 0;
|
||||
for (NodeEnvironment.NodePath nodePath : env.nodePaths()) {
|
||||
totFreeSpace += nodePath.fileStore.getUsableSpace();
|
||||
}
|
||||
|
||||
// Very rough heuristic of how much disk space we expect the shard will use over its lifetime, the max of current average
|
||||
// shard size across the cluster and 5% of the total available free space on this node:
|
||||
long estShardSizeInBytes = Math.max(avgShardSizeInBytes, (long) (totFreeSpace/20.0));
|
||||
|
||||
// Collate predicted (guessed!) disk usage on each path.data:
|
||||
Map<Path,Long> reservedBytes = new HashMap<>();
|
||||
for (IndexShard shard : shards) {
|
||||
Path dataPath = NodeEnvironment.shardStatePathToDataPath(shard.shardPath().getShardStatePath());
|
||||
|
||||
// Remove indices/<index>/<shardID> subdirs from the statePath to get back to the path.data/<lockID>:
|
||||
Long curBytes = reservedBytes.get(dataPath);
|
||||
if (curBytes == null) {
|
||||
curBytes = 0L;
|
||||
}
|
||||
reservedBytes.put(dataPath, curBytes + estShardSizeInBytes);
|
||||
}
|
||||
|
||||
return reservedBytes;
|
||||
}
|
||||
|
||||
public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shardId, IndexSettings indexSettings,
|
||||
long avgShardSizeInBytes, Map<Path,Integer> dataPathToShardCount) throws IOException {
|
||||
|
||||
|
@ -206,7 +171,6 @@ public final class ShardPath {
|
|||
dataPath = env.resolveCustomLocation(indexSettings, shardId);
|
||||
statePath = env.nodePaths()[0].resolve(shardId);
|
||||
} else {
|
||||
|
||||
long totFreeSpace = 0;
|
||||
for (NodeEnvironment.NodePath nodePath : env.nodePaths()) {
|
||||
totFreeSpace += nodePath.fileStore.getUsableSpace();
|
||||
|
@ -241,9 +205,7 @@ public final class ShardPath {
|
|||
statePath = bestPath.resolve(shardId);
|
||||
dataPath = statePath;
|
||||
}
|
||||
|
||||
final String indexUUID = indexSettings.getUUID();
|
||||
return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, indexUUID, shardId);
|
||||
return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -258,9 +220,6 @@ public final class ShardPath {
|
|||
if (shardId != null ? !shardId.equals(shardPath.shardId) : shardPath.shardId != null) {
|
||||
return false;
|
||||
}
|
||||
if (indexUUID != null ? !indexUUID.equals(shardPath.indexUUID) : shardPath.indexUUID != null) {
|
||||
return false;
|
||||
}
|
||||
if (path != null ? !path.equals(shardPath.path) : shardPath.path != null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -271,7 +230,6 @@ public final class ShardPath {
|
|||
@Override
|
||||
public int hashCode() {
|
||||
int result = path != null ? path.hashCode() : 0;
|
||||
result = 31 * result + (indexUUID != null ? indexUUID.hashCode() : 0);
|
||||
result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
@ -280,7 +238,6 @@ public final class ShardPath {
|
|||
public String toString() {
|
||||
return "ShardPath{" +
|
||||
"path=" + path +
|
||||
", indexUUID='" + indexUUID + '\'' +
|
||||
", shard=" + shardId +
|
||||
'}';
|
||||
}
|
||||
|
|
|
@ -531,7 +531,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
}
|
||||
// this is a pure protection to make sure this index doesn't get re-imported as a dangling index.
|
||||
// we should in the future rather write a tombstone rather than wiping the metadata.
|
||||
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index.getName()));
|
||||
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.SuppressForbidden;
|
|||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.gateway.MetaDataStateFormat;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -36,7 +37,11 @@ import org.elasticsearch.test.IndexSettingsModule;
|
|||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -129,21 +134,22 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||
public void testShardLock() throws IOException {
|
||||
final NodeEnvironment env = newNodeEnvironment();
|
||||
|
||||
ShardLock fooLock = env.shardLock(new ShardId("foo", "_na_", 0));
|
||||
assertEquals(new ShardId("foo", "_na_", 0), fooLock.getShardId());
|
||||
Index index = new Index("foo", "fooUUID");
|
||||
ShardLock fooLock = env.shardLock(new ShardId(index, 0));
|
||||
assertEquals(new ShardId(index, 0), fooLock.getShardId());
|
||||
|
||||
try {
|
||||
env.shardLock(new ShardId("foo", "_na_", 0));
|
||||
env.shardLock(new ShardId(index, 0));
|
||||
fail("shard is locked");
|
||||
} catch (LockObtainFailedException ex) {
|
||||
// expected
|
||||
}
|
||||
for (Path path : env.indexPaths("foo")) {
|
||||
for (Path path : env.indexPaths(index)) {
|
||||
Files.createDirectories(path.resolve("0"));
|
||||
Files.createDirectories(path.resolve("1"));
|
||||
}
|
||||
try {
|
||||
env.lockAllForIndex(new Index("foo", "_na_"), idxSettings, randomIntBetween(0, 10));
|
||||
env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10));
|
||||
fail("shard 0 is locked");
|
||||
} catch (LockObtainFailedException ex) {
|
||||
// expected
|
||||
|
@ -151,11 +157,11 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||
|
||||
fooLock.close();
|
||||
// can lock again?
|
||||
env.shardLock(new ShardId("foo", "_na_", 0)).close();
|
||||
env.shardLock(new ShardId(index, 0)).close();
|
||||
|
||||
List<ShardLock> locks = env.lockAllForIndex(new Index("foo", "_na_"), idxSettings, randomIntBetween(0, 10));
|
||||
List<ShardLock> locks = env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10));
|
||||
try {
|
||||
env.shardLock(new ShardId("foo", "_na_", 0));
|
||||
env.shardLock(new ShardId(index, 0));
|
||||
fail("shard is locked");
|
||||
} catch (LockObtainFailedException ex) {
|
||||
// expected
|
||||
|
@ -165,18 +171,45 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||
env.close();
|
||||
}
|
||||
|
||||
public void testGetAllIndices() throws Exception {
|
||||
public void testAvailableIndexFolders() throws Exception {
|
||||
final NodeEnvironment env = newNodeEnvironment();
|
||||
final int numIndices = randomIntBetween(1, 10);
|
||||
Set<String> actualPaths = new HashSet<>();
|
||||
for (int i = 0; i < numIndices; i++) {
|
||||
for (Path path : env.indexPaths("foo" + i)) {
|
||||
Files.createDirectories(path);
|
||||
Index index = new Index("foo" + i, "fooUUID" + i);
|
||||
for (Path path : env.indexPaths(index)) {
|
||||
Files.createDirectories(path.resolve(MetaDataStateFormat.STATE_DIR_NAME));
|
||||
actualPaths.add(path.getFileName().toString());
|
||||
}
|
||||
}
|
||||
Set<String> indices = env.findAllIndices();
|
||||
assertEquals(indices.size(), numIndices);
|
||||
|
||||
assertThat(actualPaths, equalTo(env.availableIndexFolders()));
|
||||
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
|
||||
env.close();
|
||||
}
|
||||
|
||||
public void testResolveIndexFolders() throws Exception {
|
||||
final NodeEnvironment env = newNodeEnvironment();
|
||||
final int numIndices = randomIntBetween(1, 10);
|
||||
Map<String, List<Path>> actualIndexDataPaths = new HashMap<>();
|
||||
for (int i = 0; i < numIndices; i++) {
|
||||
assertTrue(indices.contains("foo" + i));
|
||||
Index index = new Index("foo" + i, "fooUUID" + i);
|
||||
Path[] indexPaths = env.indexPaths(index);
|
||||
for (Path path : indexPaths) {
|
||||
Files.createDirectories(path);
|
||||
String fileName = path.getFileName().toString();
|
||||
List<Path> paths = actualIndexDataPaths.get(fileName);
|
||||
if (paths == null) {
|
||||
paths = new ArrayList<>();
|
||||
}
|
||||
paths.add(path);
|
||||
actualIndexDataPaths.put(fileName, paths);
|
||||
}
|
||||
}
|
||||
for (Map.Entry<String, List<Path>> actualIndexDataPathEntry : actualIndexDataPaths.entrySet()) {
|
||||
List<Path> actual = actualIndexDataPathEntry.getValue();
|
||||
Path[] actualPaths = actual.toArray(new Path[actual.size()]);
|
||||
assertThat(actualPaths, equalTo(env.resolveIndexFolder(actualIndexDataPathEntry.getKey())));
|
||||
}
|
||||
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
|
||||
env.close();
|
||||
|
@ -184,44 +217,45 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||
|
||||
public void testDeleteSafe() throws IOException, InterruptedException {
|
||||
final NodeEnvironment env = newNodeEnvironment();
|
||||
ShardLock fooLock = env.shardLock(new ShardId("foo", "_na_", 0));
|
||||
assertEquals(new ShardId("foo", "_na_", 0), fooLock.getShardId());
|
||||
final Index index = new Index("foo", "fooUUID");
|
||||
ShardLock fooLock = env.shardLock(new ShardId(index, 0));
|
||||
assertEquals(new ShardId(index, 0), fooLock.getShardId());
|
||||
|
||||
|
||||
for (Path path : env.indexPaths("foo")) {
|
||||
for (Path path : env.indexPaths(index)) {
|
||||
Files.createDirectories(path.resolve("0"));
|
||||
Files.createDirectories(path.resolve("1"));
|
||||
}
|
||||
|
||||
try {
|
||||
env.deleteShardDirectorySafe(new ShardId("foo", "_na_", 0), idxSettings);
|
||||
env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);
|
||||
fail("shard is locked");
|
||||
} catch (LockObtainFailedException ex) {
|
||||
// expected
|
||||
}
|
||||
|
||||
for (Path path : env.indexPaths("foo")) {
|
||||
for (Path path : env.indexPaths(index)) {
|
||||
assertTrue(Files.exists(path.resolve("0")));
|
||||
assertTrue(Files.exists(path.resolve("1")));
|
||||
|
||||
}
|
||||
|
||||
env.deleteShardDirectorySafe(new ShardId("foo", "_na_", 1), idxSettings);
|
||||
env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings);
|
||||
|
||||
for (Path path : env.indexPaths("foo")) {
|
||||
for (Path path : env.indexPaths(index)) {
|
||||
assertTrue(Files.exists(path.resolve("0")));
|
||||
assertFalse(Files.exists(path.resolve("1")));
|
||||
}
|
||||
|
||||
try {
|
||||
env.deleteIndexDirectorySafe(new Index("foo", "_na_"), randomIntBetween(0, 10), idxSettings);
|
||||
env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings);
|
||||
fail("shard is locked");
|
||||
} catch (LockObtainFailedException ex) {
|
||||
// expected
|
||||
}
|
||||
fooLock.close();
|
||||
|
||||
for (Path path : env.indexPaths("foo")) {
|
||||
for (Path path : env.indexPaths(index)) {
|
||||
assertTrue(Files.exists(path));
|
||||
}
|
||||
|
||||
|
@ -242,7 +276,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
start.await();
|
||||
try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "_na_", 0))) {
|
||||
try (ShardLock autoCloses = env.shardLock(new ShardId(index, 0))) {
|
||||
blockLatch.countDown();
|
||||
Thread.sleep(randomIntBetween(1, 10));
|
||||
}
|
||||
|
@ -257,11 +291,11 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||
start.countDown();
|
||||
blockLatch.await();
|
||||
|
||||
env.deleteIndexDirectorySafe(new Index("foo", "_na_"), 5000, idxSettings);
|
||||
env.deleteIndexDirectorySafe(index, 5000, idxSettings);
|
||||
|
||||
assertNull(threadException.get());
|
||||
|
||||
for (Path path : env.indexPaths("foo")) {
|
||||
for (Path path : env.indexPaths(index)) {
|
||||
assertFalse(Files.exists(path));
|
||||
}
|
||||
latch.await();
|
||||
|
@ -300,7 +334,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||
for (int i = 0; i < iters; i++) {
|
||||
int shard = randomIntBetween(0, counts.length - 1);
|
||||
try {
|
||||
try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "_na_", shard), scaledRandomIntBetween(0, 10))) {
|
||||
try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "fooUUID", shard), scaledRandomIntBetween(0, 10))) {
|
||||
counts[shard].value++;
|
||||
countsAtomic[shard].incrementAndGet();
|
||||
assertEquals(flipFlop[shard].incrementAndGet(), 1);
|
||||
|
@ -334,37 +368,38 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||
String[] dataPaths = tmpPaths();
|
||||
NodeEnvironment env = newNodeEnvironment(dataPaths, "/tmp", Settings.EMPTY);
|
||||
|
||||
IndexSettings s1 = IndexSettingsModule.newIndexSettings("myindex", Settings.EMPTY);
|
||||
IndexSettings s2 = IndexSettingsModule.newIndexSettings("myindex", Settings.builder().put(IndexMetaData.SETTING_DATA_PATH, "/tmp/foo").build());
|
||||
Index index = new Index("myindex", "_na_");
|
||||
final Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_INDEX_UUID, "myindexUUID").build();
|
||||
IndexSettings s1 = IndexSettingsModule.newIndexSettings("myindex", indexSettings);
|
||||
IndexSettings s2 = IndexSettingsModule.newIndexSettings("myindex", Settings.builder().put(indexSettings).put(IndexMetaData.SETTING_DATA_PATH, "/tmp/foo").build());
|
||||
Index index = new Index("myindex", "myindexUUID");
|
||||
ShardId sid = new ShardId(index, 0);
|
||||
|
||||
assertFalse("no settings should mean no custom data path", s1.hasCustomDataPath());
|
||||
assertTrue("settings with path_data should have a custom data path", s2.hasCustomDataPath());
|
||||
|
||||
assertThat(env.availableShardPaths(sid), equalTo(env.availableShardPaths(sid)));
|
||||
assertThat(env.resolveCustomLocation(s2, sid), equalTo(PathUtils.get("/tmp/foo/0/myindex/0")));
|
||||
assertThat(env.resolveCustomLocation(s2, sid), equalTo(PathUtils.get("/tmp/foo/0/" + index.getUUID() + "/0")));
|
||||
|
||||
assertThat("shard paths with a custom data_path should contain only regular paths",
|
||||
env.availableShardPaths(sid),
|
||||
equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex/0")));
|
||||
equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/" + index.getUUID() + "/0")));
|
||||
|
||||
assertThat("index paths uses the regular template",
|
||||
env.indexPaths(index.getName()), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex")));
|
||||
env.indexPaths(index), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/" + index.getUUID())));
|
||||
|
||||
env.close();
|
||||
NodeEnvironment env2 = newNodeEnvironment(dataPaths, "/tmp",
|
||||
Settings.builder().put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH.getKey(), false).build());
|
||||
|
||||
assertThat(env2.availableShardPaths(sid), equalTo(env2.availableShardPaths(sid)));
|
||||
assertThat(env2.resolveCustomLocation(s2, sid), equalTo(PathUtils.get("/tmp/foo/myindex/0")));
|
||||
assertThat(env2.resolveCustomLocation(s2, sid), equalTo(PathUtils.get("/tmp/foo/" + index.getUUID() + "/0")));
|
||||
|
||||
assertThat("shard paths with a custom data_path should contain only regular paths",
|
||||
env2.availableShardPaths(sid),
|
||||
equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex/0")));
|
||||
equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/" + index.getUUID() + "/0")));
|
||||
|
||||
assertThat("index paths uses the regular template",
|
||||
env2.indexPaths(index.getName()), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex")));
|
||||
env2.indexPaths(index), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/" + index.getUUID())));
|
||||
|
||||
env2.close();
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.hamcrest.Matchers;
|
|||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -53,6 +54,47 @@ public class DanglingIndicesStateTests extends ESTestCase {
|
|||
assertTrue(danglingState.getDanglingIndices().isEmpty());
|
||||
}
|
||||
}
|
||||
public void testDanglingIndicesDiscovery() throws Exception {
|
||||
try (NodeEnvironment env = newNodeEnvironment()) {
|
||||
MetaStateService metaStateService = new MetaStateService(Settings.EMPTY, env);
|
||||
DanglingIndicesState danglingState = new DanglingIndicesState(Settings.EMPTY, env, metaStateService, null);
|
||||
|
||||
assertTrue(danglingState.getDanglingIndices().isEmpty());
|
||||
MetaData metaData = MetaData.builder().build();
|
||||
final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetaData.SETTING_INDEX_UUID, "test1UUID");
|
||||
IndexMetaData dangledIndex = IndexMetaData.builder("test1").settings(settings).build();
|
||||
metaStateService.writeIndex("test_write", dangledIndex);
|
||||
Map<Index, IndexMetaData> newDanglingIndices = danglingState.findNewDanglingIndices(metaData);
|
||||
assertTrue(newDanglingIndices.containsKey(dangledIndex.getIndex()));
|
||||
metaData = MetaData.builder().put(dangledIndex, false).build();
|
||||
newDanglingIndices = danglingState.findNewDanglingIndices(metaData);
|
||||
assertFalse(newDanglingIndices.containsKey(dangledIndex.getIndex()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testInvalidIndexFolder() throws Exception {
|
||||
try (NodeEnvironment env = newNodeEnvironment()) {
|
||||
MetaStateService metaStateService = new MetaStateService(Settings.EMPTY, env);
|
||||
DanglingIndicesState danglingState = new DanglingIndicesState(Settings.EMPTY, env, metaStateService, null);
|
||||
|
||||
MetaData metaData = MetaData.builder().build();
|
||||
final String uuid = "test1UUID";
|
||||
final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetaData.SETTING_INDEX_UUID, uuid);
|
||||
IndexMetaData dangledIndex = IndexMetaData.builder("test1").settings(settings).build();
|
||||
metaStateService.writeIndex("test_write", dangledIndex);
|
||||
for (Path path : env.resolveIndexFolder(uuid)) {
|
||||
if (Files.exists(path)) {
|
||||
Files.move(path, path.resolveSibling("invalidUUID"), StandardCopyOption.ATOMIC_MOVE);
|
||||
}
|
||||
}
|
||||
try {
|
||||
danglingState.findNewDanglingIndices(metaData);
|
||||
fail("no exception thrown for invalid folder name");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("[invalidUUID] invalid index folder name, rename to [test1UUID]"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testDanglingProcessing() throws Exception {
|
||||
try (NodeEnvironment env = newNodeEnvironment()) {
|
||||
|
@ -61,15 +103,16 @@ public class DanglingIndicesStateTests extends ESTestCase {
|
|||
|
||||
MetaData metaData = MetaData.builder().build();
|
||||
|
||||
IndexMetaData dangledIndex = IndexMetaData.builder("test1").settings(indexSettings).build();
|
||||
metaStateService.writeIndex("test_write", dangledIndex, null);
|
||||
final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetaData.SETTING_INDEX_UUID, "test1UUID");
|
||||
IndexMetaData dangledIndex = IndexMetaData.builder("test1").settings(settings).build();
|
||||
metaStateService.writeIndex("test_write", dangledIndex);
|
||||
|
||||
// check that several runs when not in the metadata still keep the dangled index around
|
||||
int numberOfChecks = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numberOfChecks; i++) {
|
||||
Map<String, IndexMetaData> newDanglingIndices = danglingState.findNewDanglingIndices(metaData);
|
||||
Map<Index, IndexMetaData> newDanglingIndices = danglingState.findNewDanglingIndices(metaData);
|
||||
assertThat(newDanglingIndices.size(), equalTo(1));
|
||||
assertThat(newDanglingIndices.keySet(), Matchers.hasItems("test1"));
|
||||
assertThat(newDanglingIndices.keySet(), Matchers.hasItems(dangledIndex.getIndex()));
|
||||
assertTrue(danglingState.getDanglingIndices().isEmpty());
|
||||
}
|
||||
|
||||
|
@ -77,7 +120,7 @@ public class DanglingIndicesStateTests extends ESTestCase {
|
|||
danglingState.findNewAndAddDanglingIndices(metaData);
|
||||
|
||||
assertThat(danglingState.getDanglingIndices().size(), equalTo(1));
|
||||
assertThat(danglingState.getDanglingIndices().keySet(), Matchers.hasItems("test1"));
|
||||
assertThat(danglingState.getDanglingIndices().keySet(), Matchers.hasItems(dangledIndex.getIndex()));
|
||||
}
|
||||
|
||||
// simulate allocation to the metadata
|
||||
|
@ -85,35 +128,15 @@ public class DanglingIndicesStateTests extends ESTestCase {
|
|||
|
||||
// check that several runs when in the metadata, but not cleaned yet, still keeps dangled
|
||||
for (int i = 0; i < numberOfChecks; i++) {
|
||||
Map<String, IndexMetaData> newDanglingIndices = danglingState.findNewDanglingIndices(metaData);
|
||||
Map<Index, IndexMetaData> newDanglingIndices = danglingState.findNewDanglingIndices(metaData);
|
||||
assertTrue(newDanglingIndices.isEmpty());
|
||||
|
||||
assertThat(danglingState.getDanglingIndices().size(), equalTo(1));
|
||||
assertThat(danglingState.getDanglingIndices().keySet(), Matchers.hasItems("test1"));
|
||||
assertThat(danglingState.getDanglingIndices().keySet(), Matchers.hasItems(dangledIndex.getIndex()));
|
||||
}
|
||||
|
||||
danglingState.cleanupAllocatedDangledIndices(metaData);
|
||||
assertTrue(danglingState.getDanglingIndices().isEmpty());
|
||||
}
|
||||
}
|
||||
|
||||
public void testRenameOfIndexState() throws Exception {
|
||||
try (NodeEnvironment env = newNodeEnvironment()) {
|
||||
MetaStateService metaStateService = new MetaStateService(Settings.EMPTY, env);
|
||||
DanglingIndicesState danglingState = new DanglingIndicesState(Settings.EMPTY, env, metaStateService, null);
|
||||
|
||||
MetaData metaData = MetaData.builder().build();
|
||||
|
||||
IndexMetaData dangledIndex = IndexMetaData.builder("test1").settings(indexSettings).build();
|
||||
metaStateService.writeIndex("test_write", dangledIndex, null);
|
||||
|
||||
for (Path path : env.indexPaths("test1")) {
|
||||
Files.move(path, path.getParent().resolve("test1_renamed"));
|
||||
}
|
||||
|
||||
Map<String, IndexMetaData> newDanglingIndices = danglingState.findNewDanglingIndices(metaData);
|
||||
assertThat(newDanglingIndices.size(), equalTo(1));
|
||||
assertThat(newDanglingIndices.keySet(), Matchers.hasItems("test1_renamed"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue