Revert "Add index.data_path setting"

This reverts commit b2ec19ab36.
This commit is contained in:
Lee Hinman 2014-12-17 09:39:19 +01:00
parent 8f146f9ab0
commit 853879a121
23 changed files with 159 additions and 555 deletions

View File

@ -48,9 +48,9 @@ import java.util.Set;
import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream;
import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
/**
* A request to create an index. Best created with {@link org.elasticsearch.client.Requests#createIndexRequest(String)}.

View File

@ -167,7 +167,6 @@ public class IndexMetaData {
public static final String SETTING_UUID = "index.uuid";
public static final String SETTING_LEGACY_ROUTING_HASH_FUNCTION = "index.legacy.routing.hash.type";
public static final String SETTING_LEGACY_ROUTING_USE_TYPE = "index.legacy.routing.use_type";
public static final String SETTING_DATA_PATH = "index.data_path";
public static final String INDEX_UUID_NA_VALUE = "_na_";
// hard-coded hash function as of 2.0

View File

@ -26,7 +26,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
@ -58,7 +57,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
@ -66,7 +64,6 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexCreationException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.river.RiverIndexName;
@ -104,13 +101,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final String riverIndexName;
private final AliasValidator aliasValidator;
private final IndexTemplateFilter indexTemplateFilter;
private final NodeEnvironment nodeEnv;
@Inject
public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService,
IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService,
Version version, @RiverIndexName String riverIndexName, AliasValidator aliasValidator,
Set<IndexTemplateFilter> indexTemplateFilters, NodeEnvironment nodeEnv) {
public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
AllocationService allocationService, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName,
AliasValidator aliasValidator, Set<IndexTemplateFilter> indexTemplateFilters) {
super(settings);
this.environment = environment;
this.threadPool = threadPool;
@ -121,7 +116,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
this.version = version;
this.riverIndexName = riverIndexName;
this.aliasValidator = aliasValidator;
this.nodeEnv = nodeEnv;
if (indexTemplateFilters.isEmpty()) {
this.indexTemplateFilter = DEFAULT_INDEX_TEMPLATE_FILTER;
@ -560,11 +554,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws ElasticsearchException {
validateIndexName(request.index(), state);
String customPath = request.settings().get(IndexMetaData.SETTING_DATA_PATH, null);
if (customPath != null && nodeEnv.isCustomPathsEnabled() == false) {
throw new IndexCreationException(new Index(request.index()),
new ElasticsearchIllegalArgumentException("custom data_paths for indices is disabled"));
}
}
private static class DefaultIndexTemplateFilter implements IndexTemplateFilter {

View File

@ -22,11 +22,13 @@ package org.elasticsearch.env;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.lucene.store.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
@ -34,15 +36,13 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -55,24 +55,15 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
/* ${data.paths}/nodes/{node.id}/indices */
private final Path[] nodeIndicesPaths;
private final Lock[] locks;
private final boolean addNodeId;
private final int localNodeId;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Map<ShardId, InternalShardLock> shardLocks = new HashMap<>();
private final boolean customPathsEnabled;
public static final String ADD_NODE_ID_TO_CUSTOM_PATH = "node.add_id_to_custom_path";
public static final String SETTING_CUSTOM_DATA_PATH_ENABLED = "node.enable_custom_paths";
@Inject
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
super(settings);
addNodeId = settings.getAsBoolean(ADD_NODE_ID_TO_CUSTOM_PATH, true);
this.customPathsEnabled = settings.getAsBoolean(SETTING_CUSTOM_DATA_PATH_ENABLED, false);
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
nodePaths = null;
nodeIndicesPaths = null;
@ -162,8 +153,8 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
* @param shardId the id of the shard to delete to delete
* @throws IOException if an IOException occurs
*/
public void deleteShardDirectorySafe(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
final Path[] paths = shardPaths(shardId, indexSettings);
public void deleteShardDirectorySafe(ShardId shardId) throws IOException {
final Path[] paths = shardPaths(shardId);
try (Closeable lock = shardLock(shardId)) {
IOUtils.rm(paths);
}
@ -177,12 +168,13 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
* @param index the index to delete
* @throws Exception if any of the shards data directories can't be locked or deleted
*/
public void deleteIndexDirectorySafe(Index index, @IndexSettings Settings indexSettings) throws IOException {
// This is to ensure someone doesn't use ImmutableSettings.EMPTY
assert indexSettings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS) != null : "real index settings with a shard number must be used";
public void deleteIndexDirectorySafe(Index index) throws IOException {
final List<ShardLock> locks = lockAllForIndex(index);
try {
final Path[] indexPaths = indexPaths(index, indexSettings);
final Path[] indexPaths = new Path[nodeIndicesPaths.length];
for (int i = 0; i < indexPaths.length; i++) {
indexPaths[i] = nodeIndicesPaths[i].resolve(index.name());
}
IOUtils.rm(indexPaths);
} finally {
IOUtils.closeWhileHandlingException(locks);
@ -190,6 +182,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
}
/**
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
* an {@link LockObtainFailedException} is thrown and all previously acquired locks are released.
@ -355,39 +348,22 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
}
/**
* Returns all data paths including custom index paths
* Returns all data paths for the given index.
*/
public Path[] indexPaths(Index index, @IndexSettings Settings indexSettings) {
public Path[] indexPaths(Index index) {
assert assertEnvIsLocked();
if (hasCustomDataPath(indexSettings)) {
Path[] allPaths = new Path[nodeIndicesPaths.length + 1];
for (int i = 0; i < nodeIndicesPaths.length; i++) {
allPaths[i] = nodeIndicesPaths[i].resolve(index.name());
}
if (addNodeId) {
allPaths[nodeIndicesPaths.length] = Paths.get(indexSettings.get(IndexMetaData.SETTING_DATA_PATH),
Integer.toString(this.localNodeId));
} else {
allPaths[nodeIndicesPaths.length] = Paths.get(indexSettings.get(IndexMetaData.SETTING_DATA_PATH));
}
return allPaths;
} else {
Path[] indexPaths = new Path[nodeIndicesPaths.length];
for (int i = 0; i < nodeIndicesPaths.length; i++) {
indexPaths[i] = nodeIndicesPaths[i].resolve(index.name());
}
return indexPaths;
}
}
/**
* Returns all paths where lucene data will be stored
* Returns all data paths for the given shards ID
*/
public Path[] shardDataPaths(ShardId shardId, @IndexSettings Settings indexSettings) {
public Path[] shardPaths(ShardId shardId) {
assert assertEnvIsLocked();
if (hasCustomDataPath(indexSettings)) {
return new Path[] {resolveCustomLocation(indexSettings, shardId)};
} else {
final Path[] nodePaths = nodeDataPaths();
final Path[] shardLocations = new Path[nodePaths.length];
for (int i = 0; i < nodePaths.length; i++) {
@ -395,27 +371,6 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
}
return shardLocations;
}
}
/**
* Returns all shard paths including custom shard path
*/
public Path[] shardPaths(ShardId shardId, @IndexSettings Settings indexSettings) {
assert assertEnvIsLocked();
final Path[] nodePaths = nodeDataPaths();
int size = hasCustomDataPath(indexSettings) ? nodePaths.length + 1 : nodePaths.length;
final Path[] shardLocations = new Path[size];
for (int i = 0; i < nodePaths.length; i++) {
shardLocations[i] = nodePaths[i].resolve(Paths.get("indices", shardId.index().name(), Integer.toString(shardId.id())));
}
if (hasCustomDataPath(indexSettings)) {
shardLocations[nodePaths.length] = resolveCustomLocation(indexSettings, shardId);
return shardLocations;
} else {
return shardLocations;
}
}
public Set<String> findAllIndices() throws Exception {
if (nodePaths == null || locks == null) {
@ -556,40 +511,4 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
return settings;
}
/** return true if custom paths are allowed for indices */
public boolean isCustomPathsEnabled() {
return customPathsEnabled;
}
/**
* @param indexSettings settings for an index
* @return true if the index has a custom data path
*/
static boolean hasCustomDataPath(@IndexSettings Settings indexSettings) {
return indexSettings.get(IndexMetaData.SETTING_DATA_PATH) != null;
}
/**
* Resolve the custom path for a index's shard.
* Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine
* the root path for the index.
*
* @param indexSettings settings for the index
* @param shardId shard to resolve the path to
*/
private Path resolveCustomLocation(@IndexSettings Settings indexSettings, final ShardId shardId) {
String customDataDir = indexSettings.get(IndexMetaData.SETTING_DATA_PATH);
if (customDataDir != null) {
if (customPathsEnabled == false) {
throw new ElasticsearchIllegalArgumentException("custom data_paths for indices is disabled");
}
if (addNodeId) {
return Paths.get(customDataDir, Integer.toString(this.localNodeId), shardId.index().name(), Integer.toString(shardId.id()));
} else {
return Paths.get(customDataDir, shardId.index().name(), Integer.toString(shardId.id()));
}
} else {
return null;
}
}
}

View File

@ -47,11 +47,12 @@ import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -215,7 +216,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
if (currentMetaData == null) {
// a new event..., check from the state stored
try {
currentIndexMetaData = loadIndexState(indexMetaData.index(), indexMetaData.settings());
currentIndexMetaData = loadIndexState(indexMetaData.index());
} catch (IOException ex) {
throw new ElasticsearchException("failed to load index state", ex);
}
@ -256,8 +257,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
if (nodeEnv.hasNodeFile()) {
try {
final Index idx = new Index(current.index());
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(idx, current.settings()));
nodeEnv.deleteIndexDirectorySafe(idx, current.settings());
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(idx));
nodeEnv.deleteIndexDirectorySafe(idx);
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index());
} catch (Exception ex) {
@ -295,7 +296,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
// already dangling, continue
continue;
}
final IndexMetaData indexMetaData = loadIndexState(indexName, ImmutableSettings.EMPTY);
final IndexMetaData indexMetaData = loadIndexState(indexName);
final Index index = new Index(indexName);
if (indexMetaData != null) {
try {
@ -308,7 +309,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index);
if (shardLocks.isEmpty()) {
// no shards - try to remove the directory
nodeEnv.deleteIndexDirectorySafe(index, indexMetaData.settings());
nodeEnv.deleteIndexDirectorySafe(index);
continue;
}
IOUtils.closeWhileHandlingException(shardLocks);
@ -322,7 +323,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
} else if (danglingTimeout.millis() == 0) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
try {
nodeEnv.deleteIndexDirectorySafe(index, indexMetaData.settings());
nodeEnv.deleteIndexDirectorySafe(index);
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName);
} catch (Exception ex) {
@ -330,11 +331,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
}
} else {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
danglingIndices.put(indexName,
new DanglingIndex(indexName,
threadPool.schedule(danglingTimeout,
ThreadPool.Names.SAME,
new RemoveDanglingIndex(index, indexMetaData.settings()))));
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(index))));
}
}
}
@ -348,7 +345,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
for (String indexName : danglingIndices.keySet()) {
IndexMetaData indexMetaData;
try {
indexMetaData = loadIndexState(indexName, ImmutableSettings.EMPTY);
indexMetaData = loadIndexState(indexName);
} catch (IOException ex) {
throw new ElasticsearchException("failed to load index state", ex);
}
@ -432,8 +429,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
final boolean deleteOldFiles = previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version();
final MetaDataStateFormat<IndexMetaData> writer = indexStateFormat(format, formatParams, deleteOldFiles);
try {
writer.write(indexMetaData, INDEX_STATE_FILE_PREFIX, indexMetaData.version(),
nodeEnv.indexPaths(new Index(indexMetaData.index()), indexMetaData.settings()));
writer.write(indexMetaData, INDEX_STATE_FILE_PREFIX, indexMetaData.version(), nodeEnv.indexPaths(new Index(indexMetaData.index())));
} catch (Throwable ex) {
logger.warn("[{}]: failed to write index state", ex, indexMetaData.index());
throw new IOException("failed to write state for [" + indexMetaData.index() + "]", ex);
@ -462,7 +458,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
final Set<String> indices = nodeEnv.findAllIndices();
for (String index : indices) {
IndexMetaData indexMetaData = loadIndexState(index, ImmutableSettings.EMPTY);
IndexMetaData indexMetaData = loadIndexState(index);
if (indexMetaData == null) {
logger.debug("[{}] failed to find metadata for existing index location", index);
} else {
@ -473,9 +469,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
}
@Nullable
private IndexMetaData loadIndexState(String index, @IndexSettings Settings indexSettings) throws IOException {
return MetaDataStateFormat.loadLatestState(logger, indexStateFormat(format, formatParams, true),
INDEX_STATE_FILE_PATTERN, "[" + index + "]", nodeEnv.indexPaths(new Index(index), indexSettings));
private IndexMetaData loadIndexState(String index) throws IOException {
return MetaDataStateFormat.loadLatestState(logger, indexStateFormat(format, formatParams, true), INDEX_STATE_FILE_PATTERN, "[" + index + "]", nodeEnv.indexPaths(new Index(index)));
}
private MetaData loadGlobalState() throws IOException {
@ -546,11 +541,9 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
class RemoveDanglingIndex implements Runnable {
private final Index index;
private final Settings indexSettings;
RemoveDanglingIndex(Index index, @IndexSettings Settings indexSettings) {
RemoveDanglingIndex(Index index) {
this.index = index;
this.indexSettings = indexSettings;
}
@Override
@ -564,8 +557,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
logger.warn("[{}] deleting dangling index", index);
try {
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index, indexSettings));
nodeEnv.deleteIndexDirectorySafe(index, indexSettings);
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
nodeEnv.deleteIndexDirectorySafe(index);
} catch (Exception ex) {
logger.debug("failed to delete dangling index", ex);
}

View File

@ -26,28 +26,18 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.io.*;
import java.nio.file.*;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@ -66,8 +56,7 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
private volatile Map<ShardId, ShardStateInfo> currentState = Maps.newHashMap();
@Inject
public GatewayShardsState(Settings settings, NodeEnvironment nodeEnv,
TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception {
public GatewayShardsState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
if (listGatewayStartedShards != null) { // for testing
@ -77,7 +66,7 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
try {
ensureNoPre019State();
long start = System.currentTimeMillis();
currentState = loadShardsStateInfo(ImmutableSettings.EMPTY);
currentState = loadShardsStateInfo();
logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start));
} catch (Exception e) {
logger.error("failed to read local state (started shards), exiting...", e);
@ -86,8 +75,8 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
}
}
public ShardStateInfo loadShardInfo(ShardId shardId, @IndexSettings Settings indexSettings) throws Exception {
return loadShardStateInfo(shardId, indexSettings);
public ShardStateInfo loadShardInfo(ShardId shardId) throws Exception {
return loadShardStateInfo(shardId);
}
@Override
@ -100,7 +89,7 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
RoutingNode routingNode = state.readOnlyRoutingNodes().node(state.nodes().localNodeId());
final Map<ShardId, ShardStateInfo> newState;
if (routingNode != null) {
newState = persistRoutingNodeState(routingNode, event.state());
newState = persistRoutingNodeState(routingNode);
} else {
newState = Maps.newHashMap();
}
@ -129,15 +118,14 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
}
}
Map<ShardId, ShardStateInfo> persistRoutingNodeState(RoutingNode routingNode, ClusterState clusterState) {
Map<ShardId, ShardStateInfo> persistRoutingNodeState(RoutingNode routingNode) {
final Map<ShardId, ShardStateInfo> newState = Maps.newHashMap();
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
ShardId shardId = shardRouting.shardId();
ShardStateInfo shardStateInfo = new ShardStateInfo(shardRouting.version(), shardRouting.primary());
final ShardStateInfo previous = currentState.get(shardId);
Settings indexSettings = clusterState.getMetaData().indices().get(shardId.getIndex()).settings();
if (maybeWriteShardState(shardId, shardStateInfo, previous, indexSettings)) {
if(maybeWriteShardState(shardId, shardStateInfo, previous) ) {
newState.put(shardId, shardStateInfo);
} else if (previous != null) {
currentState.put(shardId, previous);
@ -151,23 +139,20 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
return currentState;
}
boolean maybeWriteShardState(ShardId shardId, ShardStateInfo shardStateInfo, ShardStateInfo previousState,
@IndexSettings Settings indexSettings) {
boolean maybeWriteShardState(ShardId shardId, ShardStateInfo shardStateInfo, ShardStateInfo previousState) {
final String writeReason;
if (previousState == null) {
writeReason = "freshly started, version [" + shardStateInfo.version + "]";
} else if (previousState.version < shardStateInfo.version) {
writeReason = "version changed from [" + previousState.version + "] to [" + shardStateInfo.version + "]";
} else {
logger.trace("skip writing shard state - has been written before shardID: " + shardId + " previous version: [" +
previousState.version + "] current version [" + shardStateInfo.version + "]");
assert previousState.version <= shardStateInfo.version : "version should not go backwards for shardID: " + shardId +
" previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]";
logger.trace("skip writing shard state - has been written before shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]");
assert previousState.version <= shardStateInfo.version : "version should not go backwards for shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]";
return previousState.version == shardStateInfo.version;
}
try {
writeShardState(writeReason, shardId, shardStateInfo, previousState, indexSettings);
writeShardState(writeReason, shardId, shardStateInfo, previousState);
} catch (Exception e) {
logger.warn("failed to write shard state for shard " + shardId, e);
// we failed to write the shard state, we will try and write
@ -177,12 +162,12 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
}
private Map<ShardId, ShardStateInfo> loadShardsStateInfo(@IndexSettings Settings indexSettings) throws Exception {
private Map<ShardId, ShardStateInfo> loadShardsStateInfo() throws Exception {
Set<ShardId> shardIds = nodeEnv.findAllShardIds();
long highestVersion = -1;
Map<ShardId, ShardStateInfo> shardsState = Maps.newHashMap();
for (ShardId shardId : shardIds) {
ShardStateInfo shardStateInfo = loadShardStateInfo(shardId, indexSettings);
ShardStateInfo shardStateInfo = loadShardStateInfo(shardId);
if (shardStateInfo == null) {
continue;
}
@ -196,18 +181,14 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
return shardsState;
}
private ShardStateInfo loadShardStateInfo(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
return MetaDataStateFormat.loadLatestState(logger, newShardStateInfoFormat(false), SHARD_STATE_FILE_PATTERN,
shardId.toString(), nodeEnv.shardPaths(shardId, indexSettings));
private ShardStateInfo loadShardStateInfo(ShardId shardId) throws IOException {
return MetaDataStateFormat.loadLatestState(logger, newShardStateInfoFormat(false), SHARD_STATE_FILE_PATTERN, shardId.toString(), nodeEnv.shardPaths(shardId));
}
private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo,
@Nullable ShardStateInfo previousStateInfo, @IndexSettings Settings indexSettings) throws Exception {
private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo, @Nullable ShardStateInfo previousStateInfo) throws Exception {
logger.trace("{} writing shard state, reason [{}]", shardId, reason);
final boolean deleteOldFiles = previousStateInfo != null && previousStateInfo.version != shardStateInfo.version;
MetaDataStateFormat<ShardStateInfo> stateFormat = newShardStateInfoFormat(deleteOldFiles);
stateFormat.write(shardStateInfo, SHARD_STATE_FILE_PREFIX, shardStateInfo.version,
nodeEnv.shardPaths(shardId, indexSettings));
newShardStateInfoFormat(deleteOldFiles).write(shardStateInfo, SHARD_STATE_FILE_PREFIX, shardStateInfo.version, nodeEnv.shardPaths(shardId));
}
private MetaDataStateFormat<ShardStateInfo> newShardStateInfoFormat(boolean deleteOldFiles) {

View File

@ -31,14 +31,14 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.io.*;
import java.nio.file.*;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;

View File

@ -27,13 +27,11 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
@ -119,10 +117,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
@Override
protected NodeGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticsearchException {
try {
ShardId shardId = request.shardId;
IndexMetaData indexMetaData = clusterService.state().getMetaData().indices().get(shardId.getIndex());
Settings indexSettings = indexMetaData == null ? ImmutableSettings.EMPTY : indexMetaData.settings();
ShardStateInfo shardStateInfo = shardsState.loadShardInfo(request.shardId, indexSettings);
ShardStateInfo shardStateInfo = shardsState.loadShardInfo(request.shardId);
if (shardStateInfo != null) {
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateInfo.version);
}

View File

@ -245,7 +245,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
if (closed.compareAndSet(false, true)) {
final Set<Integer> shardIds = shardIds();
final IndicesService.IndexCloseListener innerListener = listener == null ? null :
new PerShardIndexCloseListener(shardIds, settingsService.getSettings(), listener);
new PerShardIndexCloseListener(shardIds, listener);
for (final int shardId : shardIds) {
try {
removeShard(shardId, reason, innerListener);
@ -448,7 +448,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
store.close(new Store.OnCloseListener() {
@Override
public void onClose(ShardId shardId) {
listener.onShardClosed(shardId, settingsService.getSettings());
listener.onShardClosed(shardId);
}
});
}
@ -460,7 +460,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
logger.debug("[{}] closed (reason: [{}])", shardId, reason);
} catch (Throwable t) {
if (listenerPassed == false && listener != null) { // only notify if the listener wasn't passed to the store
listener.onShardCloseFailed(sId, settingsService.getSettings(), t);
listener.onShardCloseFailed(sId, t);
}
throw t;
}
@ -472,40 +472,37 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
final List<Throwable> failures;
private final Set<Integer> shardIds;
private final IndicesService.IndexCloseListener listener;
private final Settings indexSettings;
public PerShardIndexCloseListener(Set<Integer> shardIds, @IndexSettings Settings indexSettings,
IndicesService.IndexCloseListener listener) {
public PerShardIndexCloseListener(Set<Integer> shardIds, IndicesService.IndexCloseListener listener) {
this.shardIds = shardIds;
this.listener = listener;
countDown = new CountDown(shardIds.size());
failures = new CopyOnWriteArrayList<>();
this.indexSettings = indexSettings;
}
@Override
public void onAllShardsClosed(Index index, @IndexSettings Settings indexSettings, List<Throwable> failures) {
public void onAllShardsClosed(Index index, List<Throwable> failures) {
assert false : "nobody should call this";
}
@Override
public void onShardClosed(ShardId shardId, @IndexSettings Settings indexSettings) {
public void onShardClosed(ShardId shardId) {
assert countDown.isCountedDown() == false;
assert shardIds.contains(shardId.getId()) : "Unknown shard id";
listener.onShardClosed(shardId, indexSettings);
listener.onShardClosed(shardId);
if (countDown.countDown()) {
listener.onAllShardsClosed(shardId.index(), indexSettings, failures);
listener.onAllShardsClosed(shardId.index(), failures);
}
}
@Override
public void onShardCloseFailed(ShardId shardId, @IndexSettings Settings indexSettings, Throwable t) {
public void onShardCloseFailed(ShardId shardId, Throwable t) {
assert countDown.isCountedDown() == false;
assert shardIds.contains(shardId.getId()) : "Unknown shard id";
listener.onShardCloseFailed(shardId, indexSettings, t);
listener.onShardCloseFailed(shardId, t);
failures.add(t);
if (countDown.countDown()) {
listener.onAllShardsClosed(shardId.index(), indexSettings, failures);
listener.onAllShardsClosed(shardId.index(), failures);
}
}
}

View File

@ -20,8 +20,6 @@
package org.elasticsearch.index.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import java.io.Closeable;
@ -46,22 +44,14 @@ public interface IndexStore extends Closeable {
/**
* Returns <tt>true</tt> if this shard is allocated on this node. Allocated means
* that it has storage files that can be deleted using {@code deleteUnallocated(ShardId, Settings)}.
* that it has storage files that can be deleted using {@link #deleteUnallocated(org.elasticsearch.index.shard.ShardId)}.
*/
boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings);
boolean canDeleteUnallocated(ShardId shardId);
/**
* Deletes this shard store since its no longer allocated.
*/
void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException;
void deleteUnallocated(ShardId shardId) throws IOException;
/**
* Return an array of all index folder locations for a given shard
*/
Path[] shardIndexLocations(ShardId shardId);
/**
* Return an array of all translog folder locations for a given shard
*/
Path[] shardTranslogLocations(ShardId shardId);
}

View File

@ -78,9 +78,9 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
protected final IndexService indexService;
protected final IndicesStore indicesStore;
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private volatile boolean nodeRateLimiting;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
@ -107,7 +107,7 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
indexService.settingsService().addListener(applySettings);
this.nodeEnv = nodeEnv;
if (nodeEnv.hasNodeFile()) {
this.locations = nodeEnv.indexPaths(index, indexSettings);
this.locations = nodeEnv.indexPaths(index);
} else {
this.locations = null;
}
@ -125,18 +125,18 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
@Override
public boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) {
public boolean canDeleteUnallocated(ShardId shardId) {
if (locations == null) {
return false;
}
if (indexService.hasShard(shardId.id())) {
return false;
}
return FileSystemUtils.exists(nodeEnv.shardPaths(shardId, indexSettings));
return FileSystemUtils.exists(nodeEnv.shardPaths(shardId));
}
@Override
public void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
public void deleteUnallocated(ShardId shardId) throws IOException {
if (locations == null) {
return;
}
@ -144,39 +144,18 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
throw new ElasticsearchIllegalStateException(shardId + " allocated, can't be deleted");
}
try {
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
nodeEnv.deleteShardDirectorySafe(shardId);
} catch (Exception ex) {
logger.debug("failed to delete shard locations", ex);
}
}
/**
* Return an array of all index folder locations for a given shard. Uses
* the index settings to determine if a custom data path is set for the
* index and uses that if applicable.
*/
public Path[] shardIndexLocations(ShardId shardId) {
Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
Path[] locations = new Path[shardLocations.length];
Path[] shardLocations = nodeEnv.shardPaths(shardId);
Path[] shardIndexLocations = new Path[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
locations[i] = shardLocations[i].resolve("index");
shardIndexLocations[i] = shardLocations[i].resolve("index");
}
logger.debug("using [{}] as shard's index location", locations);
return locations;
}
/**
* Return an array of all translog folder locations for a given shard. Uses
* the index settings to determine if a custom data path is set for the
* index and uses that if applicable.
*/
public Path[] shardTranslogLocations(ShardId shardId) {
Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
Path[] locations = new Path[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
locations[i] = shardLocations[i].resolve("translog");
}
logger.debug("using [{}] as shard's translog location", locations);
return locations;
return shardIndexLocations;
}
}

View File

@ -31,15 +31,12 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.*;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
@ -92,14 +89,15 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
@Inject
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
BigArrays bigArrays, IndexStore indexStore) throws IOException {
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, NodeEnvironment nodeEnv, BigArrays bigArrays) throws IOException {
super(shardId, indexSettings);
this.indexSettingsService = indexSettingsService;
this.bigArrays = bigArrays;
this.locations = indexStore.shardTranslogLocations(shardId);
for (Path location : locations) {
Files.createDirectories(location);
Path[] shardLocations = nodeEnv.shardPaths(shardId);
this.locations = new Path[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
locations[i] = shardLocations[i].resolve("translog");
Files.createDirectories(locations[i]);
}
this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name()));

View File

@ -55,7 +55,6 @@ import org.elasticsearch.index.query.IndexQueryParserModule;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
@ -136,16 +135,16 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
try {
removeIndex(index, "shutdown", false, new IndexCloseListener() {
@Override
public void onAllShardsClosed(Index index, @IndexSettings Settings indexSettings, List<Throwable> failures) {
public void onAllShardsClosed(Index index, List<Throwable> failures) {
latch.countDown();
}
@Override
public void onShardClosed(ShardId shardId, @IndexSettings Settings indexSettings) {
public void onShardClosed(ShardId shardId) {
}
@Override
public void onShardCloseFailed(ShardId shardId, @IndexSettings Settings indexSettings, Throwable t) {
public void onShardCloseFailed(ShardId shardId, Throwable t) {
}
});
} catch (Throwable e) {
@ -361,9 +360,9 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
removeIndex(index, reason, true, new IndexCloseListener() {
@Override
public void onAllShardsClosed(Index index, @IndexSettings Settings indexSettings, List<Throwable> failures) {
public void onAllShardsClosed(Index index, List<Throwable> failures) {
try {
nodeEnv.deleteIndexDirectorySafe(index, indexSettings);
nodeEnv.deleteIndexDirectorySafe(index);
logger.debug("deleted index [{}] from filesystem - failures {}", index, failures);
} catch (Exception e) {
for (Throwable t : failures) {
@ -375,10 +374,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
@Override
public void onShardClosed(ShardId shardId, @IndexSettings Settings indexSettings) {
public void onShardClosed(ShardId shardId) {
try {
// this is called under the shard lock - we can safely delete it
IOUtils.rm(nodeEnv.shardPaths(shardId, indexSettings));
IOUtils.rm(nodeEnv.shardPaths(shardId));
logger.debug("deleted shard [{}] from filesystem", shardId);
} catch (IOException e) {
logger.warn("Can't delete shard {} ", e, shardId);
@ -386,7 +385,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
@Override
public void onShardCloseFailed(ShardId shardId, @IndexSettings Settings indexSettings, Throwable t) {
public void onShardCloseFailed(ShardId shardId, Throwable t) {
}
});
}
@ -483,19 +482,19 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @param index the index that got closed
* @param failures the recorded shard closing failures
*/
public void onAllShardsClosed(Index index, @IndexSettings Settings indexSettings, List<Throwable> failures);
public void onAllShardsClosed(Index index, List<Throwable> failures);
/**
* Invoked once the last resource using the given shard ID is released.
* Yet, this method is called while still holding the shards lock such that
* operations on the shards data can safely be executed in this callback.
*/
public void onShardClosed(ShardId shardId, @IndexSettings Settings indexSettings);
public void onShardClosed(ShardId shardId);
/**
* Invoked if closing the given shard failed.
*/
public void onShardCloseFailed(ShardId shardId, @IndexSettings Settings indexSettings, Throwable t);
public void onShardCloseFailed(ShardId shardId, Throwable t);
}
}

View File

@ -23,7 +23,6 @@ import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -165,14 +164,14 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
IndexService indexService = indicesService.indexService(shardId.getIndex());
if (indexService == null) {
if (nodeEnv.hasNodeFile()) {
Path[] shardLocations = nodeEnv.shardPaths(shardId, ImmutableSettings.EMPTY);
Path[] shardLocations = nodeEnv.shardPaths(shardId);
if (FileSystemUtils.exists(shardLocations)) {
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
}
}
} else {
if (!indexService.hasShard(shardId.id())) {
if (indexService.store().canDeleteUnallocated(shardId, indexService.settingsService().getSettings())) {
if (indexService.store().canDeleteUnallocated(shardId)) {
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
}
}
@ -321,15 +320,14 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
IndexService indexService = indicesService.indexService(shardId.getIndex());
IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex());
if (indexService == null) {
// not physical allocation of the index, delete it from the file system if applicable
if (nodeEnv.hasNodeFile()) {
Path[] shardLocations = nodeEnv.shardPaths(shardId, indexMeta.settings());
Path[] shardLocations = nodeEnv.shardPaths(shardId);
if (FileSystemUtils.exists(shardLocations)) {
logger.debug("{} deleting shard that is no longer used", shardId);
try {
nodeEnv.deleteShardDirectorySafe(shardId, indexMeta.settings());
nodeEnv.deleteShardDirectorySafe(shardId);
} catch (Exception ex) {
logger.debug("failed to delete shard locations", ex);
}
@ -337,10 +335,10 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
} else {
if (!indexService.hasShard(shardId.id())) {
if (indexService.store().canDeleteUnallocated(shardId, indexMeta.settings())) {
if (indexService.store().canDeleteUnallocated(shardId)) {
logger.debug("{} deleting shard that is no longer used", shardId);
try {
indexService.store().deleteUnallocated(shardId, indexMeta.settings());
indexService.store().deleteUnallocated(shardId);
} catch (Exception e) {
logger.debug("{} failed to delete unallocated shard, ignoring", e, shardId);
}

View File

@ -51,10 +51,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
@ -168,7 +165,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
if (!storeType.contains("fs")) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
}
Path[] shardLocations = nodeEnv.shardPaths(shardId, metaData.settings());
Path[] shardLocations = nodeEnv.shardPaths(shardId);
Path[] shardIndexLocations = new Path[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
shardIndexLocations[i] = shardLocations[i].resolve("index");

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
@ -192,7 +191,7 @@ public class ClusterRerouteTests extends ElasticsearchIntegrationTest {
client().prepareIndex("test", "type", "1").setSource("field", "value").setRefresh(true).execute().actionGet();
logger.info("--> closing all nodes");
Path[] shardLocation = internalCluster().getInstance(NodeEnvironment.class, node_1).shardPaths(new ShardId("test", 0), ImmutableSettings.EMPTY);
Path[] shardLocation = internalCluster().getInstance(NodeEnvironment.class, node_1).shardPaths(new ShardId("test", 0));
assertThat(FileSystemUtils.exists(shardLocation), equalTo(true)); // make sure the data is there!
internalCluster().closeNonSharedNodes(false); // don't wipe data directories the index needs to be there!

View File

@ -21,7 +21,6 @@ package org.elasticsearch.env;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
@ -39,13 +38,8 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.CoreMatchers.equalTo;
public class NodeEnvironmentTests extends ElasticsearchTestCase {
private final Settings settings = ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).build();
@Test
public void testNodeLockSingleEnvironment() throws IOException {
NodeEnvironment env = newNodeEnvironment(ImmutableSettings.builder()
@ -99,7 +93,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
} catch (LockObtainFailedException ex) {
// expected
}
for (Path path : env.indexPaths(new Index("foo"), settings)) {
for (Path path : env.indexPaths(new Index("foo"))) {
Files.createDirectories(path.resolve("1"));
Files.createDirectories(path.resolve("2"));
}
@ -132,7 +126,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
final NodeEnvironment env = newNodeEnvironment();
final int numIndices = randomIntBetween(1, 10);
for (int i = 0; i < numIndices; i++) {
for (Path path : env.indexPaths(new Index("foo" + i), settings)) {
for (Path path : env.indexPaths(new Index("foo" + i))) {
Files.createDirectories(path);
}
}
@ -152,47 +146,46 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
assertEquals(new ShardId("foo", 1), fooLock.getShardId());
for (Path path : env.indexPaths(new Index("foo"), settings)) {
for (Path path : env.indexPaths(new Index("foo"))) {
Files.createDirectories(path.resolve("1"));
Files.createDirectories(path.resolve("2"));
}
try {
env.deleteShardDirectorySafe(new ShardId("foo", 1), settings);
env.deleteShardDirectorySafe(new ShardId("foo", 1));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
}
for (Path path : env.indexPaths(new Index("foo"), settings)) {
for (Path path : env.indexPaths(new Index("foo"))) {
assertTrue(Files.exists(path.resolve("1")));
assertTrue(Files.exists(path.resolve("2")));
}
env.deleteShardDirectorySafe(new ShardId("foo", 2), settings);
env.deleteShardDirectorySafe(new ShardId("foo", 2));
for (Path path : env.indexPaths(new Index("foo"), settings)) {
for (Path path : env.indexPaths(new Index("foo"))) {
assertTrue(Files.exists(path.resolve("1")));
assertFalse(Files.exists(path.resolve("2")));
}
try {
env.deleteIndexDirectorySafe(new Index("foo"), settings);
env.deleteIndexDirectorySafe(new Index("foo"));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
}
fooLock.close();
for (Path path : env.indexPaths(new Index("foo"), settings)) {
for (Path path : env.indexPaths(new Index("foo"))) {
assertTrue(Files.exists(path));
}
env.deleteIndexDirectorySafe(new Index("foo"), settings);
env.deleteIndexDirectorySafe(new Index("foo"));
for (Path path : env.indexPaths(new Index("foo"), settings)) {
for (Path path : env.indexPaths(new Index("foo"))) {
assertFalse(Files.exists(path));
}
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
@ -205,7 +198,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
final int numIndices = randomIntBetween(1, 10);
final Set<ShardId> createdShards = new HashSet<>();
for (int i = 0; i < numIndices; i++) {
for (Path path : env.indexPaths(new Index("foo" + i), settings)) {
for (Path path : env.indexPaths(new Index("foo" + i))) {
final int numShards = randomIntBetween(1, 10);
for (int j = 0; j < numShards; j++) {
Files.createDirectories(path.resolve(Integer.toString(j)));
@ -290,104 +283,4 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
}
env.close();
}
@Test
public void testCustomDataPaths() throws Exception {
String[] dataPaths = tmpPaths();
NodeEnvironment env = newNodeEnvironment(dataPaths, ImmutableSettings.EMPTY);
Settings s1 = ImmutableSettings.EMPTY;
Settings s2 = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH, "/tmp/foo").build();
ShardId sid = new ShardId("myindex", 0);
Index i = new Index("myindex");
assertFalse("no settings should mean no custom data path", NodeEnvironment.hasCustomDataPath(s1));
assertTrue("settings with path_data should have a custom data path", NodeEnvironment.hasCustomDataPath(s2));
assertThat(env.shardDataPaths(sid, s1), equalTo(env.shardPaths(sid, s1)));
assertThat(env.shardDataPaths(sid, s2), equalTo(new Path[] {Paths.get("/tmp/foo/0/myindex/0")}));
assertThat("shard paths with a custom data_path should contain regular paths and custom path",
env.shardPaths(sid, s2),
equalTo(addPaths(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex/0"),
new String[] {"/tmp/foo/0/myindex/0"})));
assertThat("index paths with no custom settings uses the regular template",
env.indexPaths(i, s1), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex")));
assertThat("index paths with custom data_path setting is the same as the data_path",
env.indexPaths(i, s2),
equalTo(addPaths(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex"),
new String[] {"/tmp/foo/0"})));
env.close();
NodeEnvironment env2 = newNodeEnvironment(dataPaths,
ImmutableSettings.builder().put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH, false).build());
assertThat(env2.shardDataPaths(sid, s1), equalTo(env2.shardPaths(sid, s1)));
assertThat(env2.shardDataPaths(sid, s2), equalTo(new Path[] {Paths.get("/tmp/foo/myindex/0")}));
assertThat("shard paths with a custom data_path should contain regular paths and custom path",
env2.shardPaths(sid, s2),
equalTo(addPaths(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex/0"),
new String[] {"/tmp/foo/myindex/0"})));
assertThat("index paths with no custom settings uses the regular template",
env2.indexPaths(i, s1), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex")));
assertThat("index paths with custom data_path setting is the same as the data_path",
env2.indexPaths(i, s2),
equalTo(addPaths(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex"),
new String[] {"/tmp/foo/"})));
}
/** Converts an array of Strings to an array of Paths, adding an additional child if specified */
private Path[] stringsToPaths(String[] strings, String additional) {
Path[] locations = new Path[strings.length];
for (int i = 0; i < strings.length; i++) {
locations[i] = Paths.get(strings[i], additional);
}
return locations;
}
/** Adds the {@code pathsToAdd} string array to the given paths list and returns it */
private Path[] addPaths(Path[] paths, String[] pathsToAdd) {
Path[] locations = new Path[paths.length + pathsToAdd.length];
for (int i = 0; i < paths.length; i++) {
locations[i] = paths[i];
}
for (int i = paths.length; i < (paths.length + pathsToAdd.length); i++) {
locations[i] = Paths.get(pathsToAdd[i - paths.length]);
}
return locations;
}
public String[] tmpPaths() {
final int numPaths = randomIntBetween(1, 3);
final String[] absPaths = new String[numPaths];
for (int i = 0; i < numPaths; i++) {
absPaths[i] = newTempDirPath().toAbsolutePath().toString();
}
return absPaths;
}
public NodeEnvironment newNodeEnvironment() throws IOException {
return newNodeEnvironment(ImmutableSettings.EMPTY);
}
public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException {
Settings build = ImmutableSettings.builder()
.put(settings)
.put("path.home", newTempDirPath().toAbsolutePath().toString())
.put(NodeEnvironment.SETTING_CUSTOM_DATA_PATH_ENABLED, true)
.putArray("path.data", tmpPaths()).build();
return new NodeEnvironment(build, new Environment(build));
}
public NodeEnvironment newNodeEnvironment(String[] dataPaths, Settings settings) throws IOException {
Settings build = ImmutableSettings.builder()
.put(settings)
.put("path.home", newTempDirPath().toAbsolutePath().toString())
.put(NodeEnvironment.SETTING_CUSTOM_DATA_PATH_ENABLED, true)
.putArray("path.data", dataPaths).build();
return new NodeEnvironment(build, new Environment(build));
}
}

View File

@ -19,16 +19,9 @@
package org.elasticsearch.gateway;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -40,8 +33,6 @@ import java.util.Map;
public class GatewayShardStateTests extends ElasticsearchTestCase {
private final Settings IDX_SETTINGS = ImmutableSettings.EMPTY;
public void testWriteShardState() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
GatewayShardsState state = new GatewayShardsState(ImmutableSettings.EMPTY, env, null);
@ -49,31 +40,24 @@ public class GatewayShardStateTests extends ElasticsearchTestCase {
long version = between(1, Integer.MAX_VALUE / 2);
boolean primary = randomBoolean();
ShardStateInfo state1 = new ShardStateInfo(version, primary);
state.maybeWriteShardState(id, state1, null, IDX_SETTINGS);
ShardStateInfo shardStateInfo = state.loadShardInfo(id, IDX_SETTINGS);
state.maybeWriteShardState(id, state1, null);
ShardStateInfo shardStateInfo = state.loadShardInfo(id);
assertEquals(shardStateInfo, state1);
ShardStateInfo state2 = new ShardStateInfo(version, primary);
state.maybeWriteShardState(id, state2, state1, IDX_SETTINGS);
shardStateInfo = state.loadShardInfo(id, IDX_SETTINGS);
state.maybeWriteShardState(id, state2, state1);
shardStateInfo = state.loadShardInfo(id);
assertEquals(shardStateInfo, state1);
ShardStateInfo state3 = new ShardStateInfo(version + 1, primary);
state.maybeWriteShardState(id, state3, state1, IDX_SETTINGS);
shardStateInfo = state.loadShardInfo(id, IDX_SETTINGS);
state.maybeWriteShardState(id, state3, state1);
shardStateInfo = state.loadShardInfo(id);
assertEquals(shardStateInfo, state3);
assertTrue(state.getCurrentState().isEmpty());
}
}
public void testPersistRoutingNode() throws Exception {
MetaData metaData = MetaData.builder().put(IndexMetaData.builder("idx")
.settings(settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
)).generateUuidIfNeeded().build();
ClusterState clusterState = new ClusterState(new ClusterName("elasticsearch"), 1, metaData, null, null, null, null);
try (NodeEnvironment env = newNodeEnvironment()) {
GatewayShardsState state = new GatewayShardsState(ImmutableSettings.EMPTY, env, null);
int numShards = between(0, 100);
@ -90,13 +74,13 @@ public class GatewayShardStateTests extends ElasticsearchTestCase {
}
RoutingNode node = new RoutingNode("foo", new DiscoveryNode("foo", null, Version.CURRENT), shards);
Map<ShardId, ShardStateInfo> shardIdShardStateInfoMap = state.persistRoutingNodeState(node, clusterState);
Map<ShardId, ShardStateInfo> shardIdShardStateInfoMap = state.persistRoutingNodeState(node);
assertEquals(shardIdShardStateInfoMap.size(), active.size());
for (Map.Entry<ShardId, ShardStateInfo> written : shardIdShardStateInfoMap.entrySet()) {
ShardStateInfo shardStateInfo = state.loadShardInfo(written.getKey(), IDX_SETTINGS);
ShardStateInfo shardStateInfo = state.loadShardInfo(written.getKey());
assertEquals(shardStateInfo, written.getValue());
if (randomBoolean()) {
assertNull(state.loadShardInfo(new ShardId("no_such_index", written.getKey().id()), IDX_SETTINGS));
assertNull(state.loadShardInfo(new ShardId("no_such_index", written.getKey().id())));
}
}
assertTrue(state.getCurrentState().isEmpty());
@ -104,13 +88,13 @@ public class GatewayShardStateTests extends ElasticsearchTestCase {
state.getCurrentState().putAll(shardIdShardStateInfoMap);
if (randomBoolean()) { // sometimes write the same thing twice
shardIdShardStateInfoMap = state.persistRoutingNodeState(node, clusterState);
shardIdShardStateInfoMap = state.persistRoutingNodeState(node);
assertEquals(shardIdShardStateInfoMap.size(), active.size());
for (Map.Entry<ShardId, ShardStateInfo> written : shardIdShardStateInfoMap.entrySet()) {
ShardStateInfo shardStateInfo = state.loadShardInfo(written.getKey(), IDX_SETTINGS);
ShardStateInfo shardStateInfo = state.loadShardInfo(written.getKey());
assertEquals(shardStateInfo, written.getValue());
if (randomBoolean()) {
assertNull(state.loadShardInfo(new ShardId("no_such_index", written.getKey().id()), IDX_SETTINGS));
assertNull(state.loadShardInfo(new ShardId("no_such_index", written.getKey().id())));
}
}
}
@ -121,15 +105,15 @@ public class GatewayShardStateTests extends ElasticsearchTestCase {
nextRoundOfShards.add(new MutableShardRouting(routing, routing.version() + 1));
}
node = new RoutingNode("foo", new DiscoveryNode("foo", null, Version.CURRENT), nextRoundOfShards);
Map<ShardId, ShardStateInfo> shardIdShardStateInfoMapNew = state.persistRoutingNodeState(node, clusterState);
Map<ShardId, ShardStateInfo> shardIdShardStateInfoMapNew = state.persistRoutingNodeState(node);
assertEquals(shardIdShardStateInfoMapNew.size(), active.size());
for (Map.Entry<ShardId, ShardStateInfo> written : shardIdShardStateInfoMapNew.entrySet()) {
ShardStateInfo shardStateInfo = state.loadShardInfo(written.getKey(), IDX_SETTINGS);
ShardStateInfo shardStateInfo = state.loadShardInfo(written.getKey());
assertEquals(shardStateInfo, written.getValue());
ShardStateInfo oldStateInfo = shardIdShardStateInfoMap.get(written.getKey());
assertEquals(oldStateInfo.version, written.getValue().version - 1);
if (randomBoolean()) {
assertNull(state.loadShardInfo(new ShardId("no_such_index", written.getKey().id()), IDX_SETTINGS));
assertNull(state.loadShardInfo(new ShardId("no_such_index", written.getKey().id())));
}
}
}

View File

@ -1,94 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
/**
* Tests for custom data path locations and templates
*/
public class IndicesCustomDataPathTests extends ElasticsearchIntegrationTest {
private volatile String path;
@Before
public void setup() {
path = newTempDirPath().toAbsolutePath().toString();
}
@After
public void teardown() throws Exception {
IOUtils.deleteFilesIgnoringExceptions(Paths.get(path));
}
@Test
@LuceneTestCase.AwaitsFix(bugUrl = "failing on windows, Lee will look into it")
public void testIndexCreatedWithCustomPathAndTemplate() throws Exception {
final String INDEX = "myindex2";
logger.info("--> creating an index with data_path [{}]", path);
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH, path);;
client().admin().indices().prepareCreate(INDEX).setSettings(sb).get();
ensureGreen(INDEX);
indexRandom(true, client().prepareIndex(INDEX, "doc", "1").setSource("{\"body\": \"foo\"}"));
SearchResponse resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get();
assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L));
assertAcked(client().admin().indices().prepareDelete(INDEX));
assertPathHasBeenCleared(path);
}
private void assertPathHasBeenCleared(String path) throws Exception {
int count = 0;
StringBuilder sb = new StringBuilder();
sb.append("[");
if (Files.exists(Paths.get(path))) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(path))) {
for (Path file : stream) {
count++;
sb.append(file.toAbsolutePath().toString());
sb.append("\n");
}
}
}
sb.append("]");
assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0));
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.nio.file.Files;
@ -165,7 +166,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
private Path shardDirectory(String server, String index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
return env.shardPaths(new ShardId(index, shard), ImmutableSettings.EMPTY)[0];
return env.shardPaths(new ShardId(index, shard))[0];
}
private boolean waitForShardDeletion(final String server, final String index, final int shard) throws InterruptedException {

View File

@ -500,7 +500,7 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
logger.info("--> verifying no temporary recoveries are left");
for (String node : internalCluster().getNodeNames()) {
NodeEnvironment nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, node);
for (final Path shardLoc : nodeEnvironment.shardPaths(new ShardId(indexName, 0), ImmutableSettings.EMPTY)) {
for (final Path shardLoc : nodeEnvironment.shardPaths(new ShardId(indexName, 0))) {
assertBusy(new Runnable() {
@Override
public void run() {

View File

@ -61,7 +61,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -724,11 +723,6 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
}
/** Used to signify whether a custom data path should be randomly used */
protected boolean useCustomDataPath() {
return true;
}
/**
* Returns a settings object used in {@link #createIndex(String...)} and {@link #prepareCreate(String)} and friends.
* This method can be overwritten by subclasses to set defaults for the indices that are created by the test.
@ -745,12 +739,6 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
if (numberOfReplicas >= 0) {
builder.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build();
}
// 30% of the time
if ((randomInt(9) < 3) && useCustomDataPath()) {
String dataPath = "data/custom-" + CHILD_JVM_ID + "/" + UUID.randomUUID().toString();
logger.info("using custom data_path for index: [{}]", dataPath);
builder.put(IndexMetaData.SETTING_DATA_PATH, dataPath);
}
return builder.build();
}

View File

@ -277,7 +277,6 @@ public final class InternalTestCluster extends TestCluster {
builder.put("script.disable_dynamic", false);
builder.put("http.pipelining", enableHttpPipelining);
builder.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false);
builder.put(NodeEnvironment.SETTING_CUSTOM_DATA_PATH_ENABLED, true);
if (Strings.hasLength(System.getProperty("es.logger.level"))) {
builder.put("logger.level", System.getProperty("es.logger.level"));
}