Add index.data_path setting
This allows specifying the path an index will be at. `index.data_path` is specified in the settings when creating an index, and can not be dynamically changed. An example request would look like: POST /myindex { "settings": { "number_of_shards": 2, "data_path": "/tmp/myindex" } } And would put data in /tmp/myindex/0/index/0 and /tmp/myindex/0/index/1 Since this can be used to write data to arbitrary locations on disk, it requires enabling the `node.enable_custom_paths` setting in elasticsearch.yml on all nodes. Relates to #8976
This commit is contained in:
parent
582d5e8d3c
commit
a4e2230ebd
|
@ -48,9 +48,9 @@ import java.util.Set;
|
||||||
|
|
||||||
import static com.google.common.collect.Maps.newHashMap;
|
import static com.google.common.collect.Maps.newHashMap;
|
||||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||||
|
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream;
|
import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream;
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
|
import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A request to create an index. Best created with {@link org.elasticsearch.client.Requests#createIndexRequest(String)}.
|
* A request to create an index. Best created with {@link org.elasticsearch.client.Requests#createIndexRequest(String)}.
|
||||||
|
|
|
@ -167,6 +167,7 @@ public class IndexMetaData {
|
||||||
public static final String SETTING_UUID = "index.uuid";
|
public static final String SETTING_UUID = "index.uuid";
|
||||||
public static final String SETTING_LEGACY_ROUTING_HASH_FUNCTION = "index.legacy.routing.hash.type";
|
public static final String SETTING_LEGACY_ROUTING_HASH_FUNCTION = "index.legacy.routing.hash.type";
|
||||||
public static final String SETTING_LEGACY_ROUTING_USE_TYPE = "index.legacy.routing.use_type";
|
public static final String SETTING_LEGACY_ROUTING_USE_TYPE = "index.legacy.routing.use_type";
|
||||||
|
public static final String SETTING_DATA_PATH = "index.data_path";
|
||||||
public static final String INDEX_UUID_NA_VALUE = "_na_";
|
public static final String INDEX_UUID_NA_VALUE = "_na_";
|
||||||
|
|
||||||
// hard-coded hash function as of 2.0
|
// hard-coded hash function as of 2.0
|
||||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import org.apache.lucene.util.CollectionUtil;
|
import org.apache.lucene.util.CollectionUtil;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRunnable;
|
import org.elasticsearch.action.ActionRunnable;
|
||||||
|
@ -57,6 +58,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||||
|
@ -64,6 +66,7 @@ import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||||
|
import org.elasticsearch.indices.IndexCreationException;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||||
import org.elasticsearch.river.RiverIndexName;
|
import org.elasticsearch.river.RiverIndexName;
|
||||||
|
@ -101,11 +104,13 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
private final String riverIndexName;
|
private final String riverIndexName;
|
||||||
private final AliasValidator aliasValidator;
|
private final AliasValidator aliasValidator;
|
||||||
private final IndexTemplateFilter indexTemplateFilter;
|
private final IndexTemplateFilter indexTemplateFilter;
|
||||||
|
private final NodeEnvironment nodeEnv;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
|
public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService,
|
||||||
AllocationService allocationService, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName,
|
IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService,
|
||||||
AliasValidator aliasValidator, Set<IndexTemplateFilter> indexTemplateFilters) {
|
Version version, @RiverIndexName String riverIndexName, AliasValidator aliasValidator,
|
||||||
|
Set<IndexTemplateFilter> indexTemplateFilters, NodeEnvironment nodeEnv) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.environment = environment;
|
this.environment = environment;
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
|
@ -116,6 +121,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
this.version = version;
|
this.version = version;
|
||||||
this.riverIndexName = riverIndexName;
|
this.riverIndexName = riverIndexName;
|
||||||
this.aliasValidator = aliasValidator;
|
this.aliasValidator = aliasValidator;
|
||||||
|
this.nodeEnv = nodeEnv;
|
||||||
|
|
||||||
if (indexTemplateFilters.isEmpty()) {
|
if (indexTemplateFilters.isEmpty()) {
|
||||||
this.indexTemplateFilter = DEFAULT_INDEX_TEMPLATE_FILTER;
|
this.indexTemplateFilter = DEFAULT_INDEX_TEMPLATE_FILTER;
|
||||||
|
@ -554,6 +560,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
|
|
||||||
private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws ElasticsearchException {
|
private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws ElasticsearchException {
|
||||||
validateIndexName(request.index(), state);
|
validateIndexName(request.index(), state);
|
||||||
|
String customPath = request.settings().get(IndexMetaData.SETTING_DATA_PATH, null);
|
||||||
|
if (customPath != null && nodeEnv.isCustomPathsEnabled() == false) {
|
||||||
|
throw new IndexCreationException(new Index(request.index()),
|
||||||
|
new ElasticsearchIllegalArgumentException("custom data_paths for indices is disabled"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class DefaultIndexTemplateFilter implements IndexTemplateFilter {
|
private static class DefaultIndexTemplateFilter implements IndexTemplateFilter {
|
||||||
|
|
|
@ -24,14 +24,19 @@ import com.google.common.collect.Sets;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import org.apache.lucene.store.*;
|
import org.apache.lucene.store.*;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -52,15 +57,30 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
/* ${data.paths}/nodes/{node.id}/indices */
|
/* ${data.paths}/nodes/{node.id}/indices */
|
||||||
private final Path[] nodeIndicesPaths;
|
private final Path[] nodeIndicesPaths;
|
||||||
private final Lock[] locks;
|
private final Lock[] locks;
|
||||||
|
private final boolean addNodeId;
|
||||||
|
|
||||||
private final int localNodeId;
|
private final int localNodeId;
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
private final Map<ShardId, InternalShardLock> shardLocks = new HashMap<>();
|
private final Map<ShardId, InternalShardLock> shardLocks = new HashMap<>();
|
||||||
|
|
||||||
|
private final boolean customPathsEnabled;
|
||||||
|
|
||||||
|
// Setting to automatically append node id to custom data paths
|
||||||
|
public static final String ADD_NODE_ID_TO_CUSTOM_PATH = "node.add_id_to_custom_path";
|
||||||
|
// Setting to enable custom index.data_path setting for new indices
|
||||||
|
public static final String SETTING_CUSTOM_DATA_PATH_ENABLED = "node.enable_custom_paths";
|
||||||
|
|
||||||
|
public static final String NODES_FOLDER = "nodes";
|
||||||
|
public static final String INDICES_FOLDER = "indices";
|
||||||
|
public static final String NODE_LOCK_FILENAME = "node.lock";
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
|
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
|
||||||
|
this.addNodeId = settings.getAsBoolean(ADD_NODE_ID_TO_CUSTOM_PATH, true);
|
||||||
|
this.customPathsEnabled = settings.getAsBoolean(SETTING_CUSTOM_DATA_PATH_ENABLED, false);
|
||||||
|
|
||||||
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
|
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
|
||||||
nodePaths = null;
|
nodePaths = null;
|
||||||
nodeIndicesPaths = null;
|
nodeIndicesPaths = null;
|
||||||
|
@ -76,14 +96,14 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
int maxLocalStorageNodes = settings.getAsInt("node.max_local_storage_nodes", 50);
|
int maxLocalStorageNodes = settings.getAsInt("node.max_local_storage_nodes", 50);
|
||||||
for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) {
|
for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) {
|
||||||
for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) {
|
for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) {
|
||||||
Path dir = environment.dataWithClusterFiles()[dirIndex].resolve(Paths.get("nodes", Integer.toString(possibleLockId)));
|
Path dir = environment.dataWithClusterFiles()[dirIndex].resolve(Paths.get(NODES_FOLDER, Integer.toString(possibleLockId)));
|
||||||
if (Files.exists(dir) == false) {
|
if (Files.exists(dir) == false) {
|
||||||
Files.createDirectories(dir);
|
Files.createDirectories(dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) {
|
try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) {
|
||||||
logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
|
logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
|
||||||
Lock tmpLock = luceneDir.makeLock("node.lock");
|
Lock tmpLock = luceneDir.makeLock(NODE_LOCK_FILENAME);
|
||||||
boolean obtained = tmpLock.obtain();
|
boolean obtained = tmpLock.obtain();
|
||||||
if (obtained) {
|
if (obtained) {
|
||||||
locks[dirIndex] = tmpLock;
|
locks[dirIndex] = tmpLock;
|
||||||
|
@ -117,7 +137,8 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (locks[0] == null) {
|
if (locks[0] == null) {
|
||||||
throw new ElasticsearchIllegalStateException("Failed to obtain node lock, is the following location writable?: " + Arrays.toString(environment.dataWithClusterFiles()), lastException);
|
throw new ElasticsearchIllegalStateException("Failed to obtain node lock, is the following location writable?: "
|
||||||
|
+ Arrays.toString(environment.dataWithClusterFiles()), lastException);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.localNodeId = localNodeId;
|
this.localNodeId = localNodeId;
|
||||||
|
@ -131,14 +152,20 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
StringBuilder sb = new StringBuilder("node data locations details:\n");
|
StringBuilder sb = new StringBuilder("node data locations details:\n");
|
||||||
for (Path file : nodePaths) {
|
for (Path file : nodePaths) {
|
||||||
sb.append(" -> ").append(file.toAbsolutePath()).append(", free_space [").append(new ByteSizeValue(Files.getFileStore(file).getUnallocatedSpace())).append("], usable_space [").append(new ByteSizeValue(Files.getFileStore(file).getUsableSpace())).append("]\n");
|
sb.append(" -> ")
|
||||||
|
.append(file.toAbsolutePath())
|
||||||
|
.append(", free_space [")
|
||||||
|
.append(new ByteSizeValue(Files.getFileStore(file).getUnallocatedSpace()))
|
||||||
|
.append("], usable_space [")
|
||||||
|
.append(new ByteSizeValue(Files.getFileStore(file).getUsableSpace()))
|
||||||
|
.append("]\n");
|
||||||
}
|
}
|
||||||
logger.trace(sb.toString());
|
logger.trace(sb.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
this.nodeIndicesPaths = new Path[nodePaths.length];
|
this.nodeIndicesPaths = new Path[nodePaths.length];
|
||||||
for (int i = 0; i < nodePaths.length; i++) {
|
for (int i = 0; i < nodePaths.length; i++) {
|
||||||
nodeIndicesPaths[i] = nodePaths[i].resolve("indices");
|
nodeIndicesPaths[i] = nodePaths[i].resolve(INDICES_FOLDER);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,10 +177,20 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
* @param shardId the id of the shard to delete to delete
|
* @param shardId the id of the shard to delete to delete
|
||||||
* @throws IOException if an IOException occurs
|
* @throws IOException if an IOException occurs
|
||||||
*/
|
*/
|
||||||
public void deleteShardDirectorySafe(ShardId shardId) throws IOException {
|
public void deleteShardDirectorySafe(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
|
||||||
|
// This is to ensure someone doesn't use ImmutableSettings.EMPTY
|
||||||
|
assert indexSettings != ImmutableSettings.EMPTY;
|
||||||
final Path[] paths = shardPaths(shardId);
|
final Path[] paths = shardPaths(shardId);
|
||||||
|
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
|
||||||
try (Closeable lock = shardLock(shardId)) {
|
try (Closeable lock = shardLock(shardId)) {
|
||||||
IOUtils.rm(paths);
|
IOUtils.rm(paths);
|
||||||
|
if (hasCustomDataPath(indexSettings)) {
|
||||||
|
Path customLocation = resolveCustomLocation(indexSettings, shardId);
|
||||||
|
logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation);
|
||||||
|
IOUtils.rm(customLocation);
|
||||||
|
}
|
||||||
|
logger.trace("deleted shard {} directory, paths: [{}]", shardId, paths);
|
||||||
|
assert FileSystemUtils.exists(paths) == false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,21 +203,25 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
|
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
|
||||||
* @throws Exception if any of the shards data directories can't be locked or deleted
|
* @throws Exception if any of the shards data directories can't be locked or deleted
|
||||||
*/
|
*/
|
||||||
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS) throws IOException {
|
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, @IndexSettings Settings indexSettings) throws IOException {
|
||||||
|
// This is to ensure someone doesn't use ImmutableSettings.EMPTY
|
||||||
|
assert indexSettings != ImmutableSettings.EMPTY;
|
||||||
final List<ShardLock> locks = lockAllForIndex(index, lockTimeoutMS);
|
final List<ShardLock> locks = lockAllForIndex(index, lockTimeoutMS);
|
||||||
try {
|
try {
|
||||||
final Path[] indexPaths = new Path[nodeIndicesPaths.length];
|
final Path[] indexPaths = indexPaths(index);
|
||||||
for (int i = 0; i < indexPaths.length; i++) {
|
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
|
||||||
indexPaths[i] = nodeIndicesPaths[i].resolve(index.name());
|
|
||||||
}
|
|
||||||
IOUtils.rm(indexPaths);
|
IOUtils.rm(indexPaths);
|
||||||
|
if (hasCustomDataPath(indexSettings)) {
|
||||||
|
Path customLocation = resolveCustomLocation(indexSettings, index.name());
|
||||||
|
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
|
||||||
|
IOUtils.rm(customLocation);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeWhileHandlingException(locks);
|
IOUtils.closeWhileHandlingException(locks);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
|
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
|
||||||
* an {@link LockObtainFailedException} is thrown and all previously acquired locks are released.
|
* an {@link LockObtainFailedException} is thrown and all previously acquired locks are released.
|
||||||
|
@ -192,7 +233,8 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
*/
|
*/
|
||||||
public List<ShardLock> lockAllForIndex(Index index, long lockTimeoutMS) throws IOException {
|
public List<ShardLock> lockAllForIndex(Index index, long lockTimeoutMS) throws IOException {
|
||||||
Set<ShardId> allShardIds = findAllShardIds(index);
|
Set<ShardId> allShardIds = findAllShardIds(index);
|
||||||
List<ShardLock> allLocks = new ArrayList<>();
|
logger.trace("locking all shards for index {} - [{}]", index, allShardIds);
|
||||||
|
List<ShardLock> allLocks = new ArrayList<>(allShardIds.size());
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
|
@ -203,6 +245,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (success == false) {
|
if (success == false) {
|
||||||
|
logger.trace("unable to lock all shards for index {}", index);
|
||||||
IOUtils.closeWhileHandlingException(allLocks);
|
IOUtils.closeWhileHandlingException(allLocks);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -236,6 +279,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
* @throws IOException if an IOException occurs.
|
* @throws IOException if an IOException occurs.
|
||||||
*/
|
*/
|
||||||
public ShardLock shardLock(final ShardId id, long lockTimeoutMS) throws IOException {
|
public ShardLock shardLock(final ShardId id, long lockTimeoutMS) throws IOException {
|
||||||
|
logger.trace("acquiring node shardlock on [{}], timeout [{}]", id, lockTimeoutMS);
|
||||||
final InternalShardLock shardLock;
|
final InternalShardLock shardLock;
|
||||||
final boolean acquired;
|
final boolean acquired;
|
||||||
synchronized (shardLocks) {
|
synchronized (shardLocks) {
|
||||||
|
@ -260,10 +304,12 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
logger.trace("successfully acquired shardlock for [{}]", id);
|
||||||
return new ShardLock(id) { // new instance prevents double closing
|
return new ShardLock(id) { // new instance prevents double closing
|
||||||
@Override
|
@Override
|
||||||
protected void closeInternal() {
|
protected void closeInternal() {
|
||||||
shardLock.release();
|
shardLock.release();
|
||||||
|
logger.trace("released shard lock for [{}]", id);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -309,7 +355,10 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
private void decWaitCount() {
|
private void decWaitCount() {
|
||||||
synchronized (shardLocks) {
|
synchronized (shardLocks) {
|
||||||
assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0";
|
assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0";
|
||||||
if (--waitCount == 0) {
|
--waitCount;
|
||||||
|
logger.trace("shard lock wait count for [{}] is now [{}]", shardId, waitCount);
|
||||||
|
if (waitCount == 0) {
|
||||||
|
logger.trace("last shard lock wait decremented, removing lock for [{}]", shardId);
|
||||||
InternalShardLock remove = shardLocks.remove(shardId);
|
InternalShardLock remove = shardLocks.remove(shardId);
|
||||||
assert remove != null : "Removed lock was null";
|
assert remove != null : "Removed lock was null";
|
||||||
}
|
}
|
||||||
|
@ -349,7 +398,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns all data paths for the given index.
|
* Returns all data paths excluding custom index paths.
|
||||||
*/
|
*/
|
||||||
public Path[] indexPaths(Index index) {
|
public Path[] indexPaths(Index index) {
|
||||||
assert assertEnvIsLocked();
|
assert assertEnvIsLocked();
|
||||||
|
@ -361,14 +410,30 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns all data paths for the given shards ID
|
* Returns all paths where lucene data will be stored, if a index.data_path
|
||||||
|
* setting is present, will return the custom data path to be used
|
||||||
|
*/
|
||||||
|
public Path[] shardDataPaths(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||||
|
assert indexSettings != ImmutableSettings.EMPTY;
|
||||||
|
assert assertEnvIsLocked();
|
||||||
|
if (hasCustomDataPath(indexSettings)) {
|
||||||
|
return new Path[] {resolveCustomLocation(indexSettings, shardId)};
|
||||||
|
} else {
|
||||||
|
return shardPaths(shardId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns all shard paths excluding custom shard path
|
||||||
*/
|
*/
|
||||||
public Path[] shardPaths(ShardId shardId) {
|
public Path[] shardPaths(ShardId shardId) {
|
||||||
assert assertEnvIsLocked();
|
assert assertEnvIsLocked();
|
||||||
final Path[] nodePaths = nodeDataPaths();
|
final Path[] nodePaths = nodeDataPaths();
|
||||||
final Path[] shardLocations = new Path[nodePaths.length];
|
final Path[] shardLocations = new Path[nodePaths.length];
|
||||||
for (int i = 0; i < nodePaths.length; i++) {
|
for (int i = 0; i < nodePaths.length; i++) {
|
||||||
shardLocations[i] = nodePaths[i].resolve(Paths.get("indices", shardId.index().name(), Integer.toString(shardId.id())));
|
shardLocations[i] = nodePaths[i].resolve(Paths.get(INDICES_FOLDER,
|
||||||
|
shardId.index().name(),
|
||||||
|
Integer.toString(shardId.id())));
|
||||||
}
|
}
|
||||||
return shardLocations;
|
return shardLocations;
|
||||||
}
|
}
|
||||||
|
@ -395,14 +460,14 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tries to find all allocated shards for the given index or for all indices iff the given index is <code>null</code>
|
* Tries to find all allocated shards for the given index or for all indices iff the given index is {@code null}
|
||||||
* on the current node. NOTE: This methods is prone to race-conditions on the filesystem layer since it might not
|
* on the current node. NOTE: This methods is prone to race-conditions on the filesystem layer since it might not
|
||||||
* see directories created concurrently or while it's traversing.
|
* see directories created concurrently or while it's traversing.
|
||||||
* @param index the index to filter shards for or <code>null</code> if all shards for all indices should be listed
|
* @param index the index to filter shards for or {@code null} if all shards for all indices should be listed
|
||||||
* @return a set of shard IDs
|
* @return a set of shard IDs
|
||||||
* @throws IOException if an IOException occurs
|
* @throws IOException if an IOException occurs
|
||||||
*/
|
*/
|
||||||
public Set<ShardId> findAllShardIds(@Nullable final Index index) throws IOException {
|
public Set<ShardId> findAllShardIds(final Index index) throws IOException {
|
||||||
if (nodePaths == null || locks == null) {
|
if (nodePaths == null || locks == null) {
|
||||||
throw new ElasticsearchIllegalStateException("node is not configured to store local location");
|
throw new ElasticsearchIllegalStateException("node is not configured to store local location");
|
||||||
}
|
}
|
||||||
|
@ -435,7 +500,8 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
if (Files.exists(shardPath) && Files.isDirectory(shardPath)) {
|
if (Files.exists(shardPath) && Files.isDirectory(shardPath)) {
|
||||||
Integer shardId = Ints.tryParse(shardPath.getFileName().toString());
|
Integer shardId = Ints.tryParse(shardPath.getFileName().toString());
|
||||||
if (shardId != null) {
|
if (shardId != null) {
|
||||||
shardIds.add(new ShardId(currentIndex, shardId));
|
ShardId id = new ShardId(currentIndex, shardId);
|
||||||
|
shardIds.add(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -500,7 +566,9 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
try {
|
try {
|
||||||
Files.move(src, target, StandardCopyOption.ATOMIC_MOVE);
|
Files.move(src, target, StandardCopyOption.ATOMIC_MOVE);
|
||||||
} catch (AtomicMoveNotSupportedException ex) {
|
} catch (AtomicMoveNotSupportedException ex) {
|
||||||
throw new ElasticsearchIllegalStateException("atomic_move is not supported by the filesystem on path [" + directory + "] atomic_move is required for elasticsearch to work correctly.", ex);
|
throw new ElasticsearchIllegalStateException("atomic_move is not supported by the filesystem on path ["
|
||||||
|
+ directory
|
||||||
|
+ "] atomic_move is required for elasticsearch to work correctly.", ex);
|
||||||
} finally {
|
} finally {
|
||||||
Files.deleteIfExists(src);
|
Files.deleteIfExists(src);
|
||||||
Files.deleteIfExists(target);
|
Files.deleteIfExists(target);
|
||||||
|
@ -512,4 +580,63 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
|
||||||
return settings;
|
return settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** return true if custom paths are allowed for indices */
|
||||||
|
public boolean isCustomPathsEnabled() {
|
||||||
|
return customPathsEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param indexSettings settings for an index
|
||||||
|
* @return true if the index has a custom data path
|
||||||
|
*/
|
||||||
|
public static boolean hasCustomDataPath(@IndexSettings Settings indexSettings) {
|
||||||
|
return indexSettings.get(IndexMetaData.SETTING_DATA_PATH) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve the custom path for a index's shard.
|
||||||
|
* Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine
|
||||||
|
* the root path for the index.
|
||||||
|
*
|
||||||
|
* @param indexSettings settings for the index
|
||||||
|
*/
|
||||||
|
private Path resolveCustomLocation(@IndexSettings Settings indexSettings) {
|
||||||
|
assert indexSettings != ImmutableSettings.EMPTY;
|
||||||
|
String customDataDir = indexSettings.get(IndexMetaData.SETTING_DATA_PATH);
|
||||||
|
if (customDataDir != null) {
|
||||||
|
// This assert is because this should be caught by MetaDataCreateIndexService
|
||||||
|
assert customPathsEnabled;
|
||||||
|
if (addNodeId) {
|
||||||
|
return Paths.get(customDataDir, Integer.toString(this.localNodeId));
|
||||||
|
} else {
|
||||||
|
return Paths.get(customDataDir);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new ElasticsearchIllegalArgumentException("no custom " + IndexMetaData.SETTING_DATA_PATH + " setting available");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve the custom path for a index's shard.
|
||||||
|
* Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine
|
||||||
|
* the root path for the index.
|
||||||
|
*
|
||||||
|
* @param indexSettings settings for the index
|
||||||
|
* @param indexName index to resolve the path for
|
||||||
|
*/
|
||||||
|
private Path resolveCustomLocation(@IndexSettings Settings indexSettings, final String indexName) {
|
||||||
|
return resolveCustomLocation(indexSettings).resolve(indexName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve the custom path for a index's shard.
|
||||||
|
* Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine
|
||||||
|
* the root path for the index.
|
||||||
|
*
|
||||||
|
* @param indexSettings settings for the index
|
||||||
|
* @param shardId shard to resolve the path to
|
||||||
|
*/
|
||||||
|
public Path resolveCustomLocation(@IndexSettings Settings indexSettings, final ShardId shardId) {
|
||||||
|
return resolveCustomLocation(indexSettings, shardId.index().name()).resolve(Integer.toString(shardId.id()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,8 @@ import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
|
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@ -165,7 +167,9 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
|
||||||
|
|
||||||
public void reset() throws Exception {
|
public void reset() throws Exception {
|
||||||
try {
|
try {
|
||||||
IOUtils.rm(nodeEnv.nodeDataPaths());
|
Path[] dataPaths = nodeEnv.nodeDataPaths();
|
||||||
|
logger.trace("removing node data paths: [{}]", dataPaths);
|
||||||
|
IOUtils.rm(dataPaths);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
logger.debug("failed to delete shard locations", ex);
|
logger.debug("failed to delete shard locations", ex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,12 +50,11 @@ import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.env.ShardLock;
|
import org.elasticsearch.env.ShardLock;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.DirectoryStream;
|
import java.nio.file.*;
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -270,7 +269,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
// it may take a couple of seconds for outstanding shard reference
|
// it may take a couple of seconds for outstanding shard reference
|
||||||
// to release their refs (for example, on going recoveries)
|
// to release their refs (for example, on going recoveries)
|
||||||
// we are working on a better solution see: https://github.com/elasticsearch/elasticsearch/pull/8608
|
// we are working on a better solution see: https://github.com/elasticsearch/elasticsearch/pull/8608
|
||||||
nodeEnv.deleteIndexDirectorySafe(idx, deleteTimeout.millis());
|
nodeEnv.deleteIndexDirectorySafe(idx, deleteTimeout.millis(), current.settings());
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (LockObtainFailedException ex) {
|
||||||
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index());
|
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index());
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
|
@ -321,7 +320,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, 0);
|
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, 0);
|
||||||
if (shardLocks.isEmpty()) {
|
if (shardLocks.isEmpty()) {
|
||||||
// no shards - try to remove the directory
|
// no shards - try to remove the directory
|
||||||
nodeEnv.deleteIndexDirectorySafe(index, 0);
|
nodeEnv.deleteIndexDirectorySafe(index, 0, indexMetaData.settings());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
IOUtils.closeWhileHandlingException(shardLocks);
|
IOUtils.closeWhileHandlingException(shardLocks);
|
||||||
|
@ -335,7 +334,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
} else if (danglingTimeout.millis() == 0) {
|
} else if (danglingTimeout.millis() == 0) {
|
||||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
|
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
|
||||||
try {
|
try {
|
||||||
nodeEnv.deleteIndexDirectorySafe(index, 0);
|
nodeEnv.deleteIndexDirectorySafe(index, 0, indexMetaData.settings());
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (LockObtainFailedException ex) {
|
||||||
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName);
|
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
|
@ -343,7 +342,11 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
|
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
|
||||||
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(index))));
|
danglingIndices.put(indexName,
|
||||||
|
new DanglingIndex(indexName,
|
||||||
|
threadPool.schedule(danglingTimeout,
|
||||||
|
ThreadPool.Names.SAME,
|
||||||
|
new RemoveDanglingIndex(index, indexMetaData.settings()))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -441,7 +444,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
final boolean deleteOldFiles = previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version();
|
final boolean deleteOldFiles = previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version();
|
||||||
final MetaDataStateFormat<IndexMetaData> writer = indexStateFormat(format, formatParams, deleteOldFiles);
|
final MetaDataStateFormat<IndexMetaData> writer = indexStateFormat(format, formatParams, deleteOldFiles);
|
||||||
try {
|
try {
|
||||||
writer.write(indexMetaData, INDEX_STATE_FILE_PREFIX, indexMetaData.version(), nodeEnv.indexPaths(new Index(indexMetaData.index())));
|
writer.write(indexMetaData, INDEX_STATE_FILE_PREFIX, indexMetaData.version(),
|
||||||
|
nodeEnv.indexPaths(new Index(indexMetaData.index())));
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
logger.warn("[{}]: failed to write index state", ex, indexMetaData.index());
|
logger.warn("[{}]: failed to write index state", ex, indexMetaData.index());
|
||||||
throw new IOException("failed to write state for [" + indexMetaData.index() + "]", ex);
|
throw new IOException("failed to write state for [" + indexMetaData.index() + "]", ex);
|
||||||
|
@ -482,7 +486,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private IndexMetaData loadIndexState(String index) throws IOException {
|
private IndexMetaData loadIndexState(String index) throws IOException {
|
||||||
return MetaDataStateFormat.loadLatestState(logger, indexStateFormat(format, formatParams, true), INDEX_STATE_FILE_PATTERN, "[" + index + "]", nodeEnv.indexPaths(new Index(index)));
|
return MetaDataStateFormat.loadLatestState(logger, indexStateFormat(format, formatParams, true),
|
||||||
|
INDEX_STATE_FILE_PATTERN, "[" + index + "]", nodeEnv.indexPaths(new Index(index)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private MetaData loadGlobalState() throws IOException {
|
private MetaData loadGlobalState() throws IOException {
|
||||||
|
@ -553,9 +558,11 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
class RemoveDanglingIndex implements Runnable {
|
class RemoveDanglingIndex implements Runnable {
|
||||||
|
|
||||||
private final Index index;
|
private final Index index;
|
||||||
|
private final Settings indexSettings;
|
||||||
|
|
||||||
RemoveDanglingIndex(Index index) {
|
RemoveDanglingIndex(Index index, @IndexSettings Settings indexSettings) {
|
||||||
this.index = index;
|
this.index = index;
|
||||||
|
this.indexSettings = indexSettings;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -570,7 +577,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
|
|
||||||
try {
|
try {
|
||||||
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
|
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
|
||||||
nodeEnv.deleteIndexDirectorySafe(index, 0);
|
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
logger.debug("failed to delete dangling index", ex);
|
logger.debug("failed to delete dangling index", ex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,18 +26,26 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateListener;
|
import org.elasticsearch.cluster.ClusterStateListener;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.*;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
|
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||||
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.*;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.IOException;
|
||||||
import java.nio.file.*;
|
import java.io.OutputStream;
|
||||||
|
import java.nio.file.DirectoryStream;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
@ -56,7 +64,8 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
|
||||||
private volatile Map<ShardId, ShardStateInfo> currentState = Maps.newHashMap();
|
private volatile Map<ShardId, ShardStateInfo> currentState = Maps.newHashMap();
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public GatewayShardsState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception {
|
public GatewayShardsState(Settings settings, NodeEnvironment nodeEnv,
|
||||||
|
TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.nodeEnv = nodeEnv;
|
this.nodeEnv = nodeEnv;
|
||||||
if (listGatewayStartedShards != null) { // for testing
|
if (listGatewayStartedShards != null) { // for testing
|
||||||
|
@ -125,7 +134,7 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
|
||||||
ShardId shardId = shardRouting.shardId();
|
ShardId shardId = shardRouting.shardId();
|
||||||
ShardStateInfo shardStateInfo = new ShardStateInfo(shardRouting.version(), shardRouting.primary());
|
ShardStateInfo shardStateInfo = new ShardStateInfo(shardRouting.version(), shardRouting.primary());
|
||||||
final ShardStateInfo previous = currentState.get(shardId);
|
final ShardStateInfo previous = currentState.get(shardId);
|
||||||
if(maybeWriteShardState(shardId, shardStateInfo, previous) ) {
|
if (maybeWriteShardState(shardId, shardStateInfo, previous)) {
|
||||||
newState.put(shardId, shardStateInfo);
|
newState.put(shardId, shardStateInfo);
|
||||||
} else if (previous != null) {
|
} else if (previous != null) {
|
||||||
currentState.put(shardId, previous);
|
currentState.put(shardId, previous);
|
||||||
|
@ -146,8 +155,10 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
|
||||||
} else if (previousState.version < shardStateInfo.version) {
|
} else if (previousState.version < shardStateInfo.version) {
|
||||||
writeReason = "version changed from [" + previousState.version + "] to [" + shardStateInfo.version + "]";
|
writeReason = "version changed from [" + previousState.version + "] to [" + shardStateInfo.version + "]";
|
||||||
} else {
|
} else {
|
||||||
logger.trace("skip writing shard state - has been written before shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]");
|
logger.trace("skip writing shard state - has been written before shardID: " + shardId + " previous version: [" +
|
||||||
assert previousState.version <= shardStateInfo.version : "version should not go backwards for shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]";
|
previousState.version + "] current version [" + shardStateInfo.version + "]");
|
||||||
|
assert previousState.version <= shardStateInfo.version : "version should not go backwards for shardID: " + shardId +
|
||||||
|
" previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]";
|
||||||
return previousState.version == shardStateInfo.version;
|
return previousState.version == shardStateInfo.version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,13 +193,16 @@ public class GatewayShardsState extends AbstractComponent implements ClusterStat
|
||||||
}
|
}
|
||||||
|
|
||||||
private ShardStateInfo loadShardStateInfo(ShardId shardId) throws IOException {
|
private ShardStateInfo loadShardStateInfo(ShardId shardId) throws IOException {
|
||||||
return MetaDataStateFormat.loadLatestState(logger, newShardStateInfoFormat(false), SHARD_STATE_FILE_PATTERN, shardId.toString(), nodeEnv.shardPaths(shardId));
|
return MetaDataStateFormat.loadLatestState(logger, newShardStateInfoFormat(false), SHARD_STATE_FILE_PATTERN,
|
||||||
|
shardId.toString(), nodeEnv.shardPaths(shardId));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo, @Nullable ShardStateInfo previousStateInfo) throws Exception {
|
private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo,
|
||||||
|
@Nullable ShardStateInfo previousStateInfo) throws Exception {
|
||||||
logger.trace("{} writing shard state, reason [{}]", shardId, reason);
|
logger.trace("{} writing shard state, reason [{}]", shardId, reason);
|
||||||
final boolean deleteOldFiles = previousStateInfo != null && previousStateInfo.version != shardStateInfo.version;
|
final boolean deleteOldFiles = previousStateInfo != null && previousStateInfo.version != shardStateInfo.version;
|
||||||
newShardStateInfoFormat(deleteOldFiles).write(shardStateInfo, SHARD_STATE_FILE_PREFIX, shardStateInfo.version, nodeEnv.shardPaths(shardId));
|
MetaDataStateFormat<ShardStateInfo> stateFormat = newShardStateInfoFormat(deleteOldFiles);
|
||||||
|
stateFormat.write(shardStateInfo, SHARD_STATE_FILE_PREFIX, shardStateInfo.version, nodeEnv.shardPaths(shardId));
|
||||||
}
|
}
|
||||||
|
|
||||||
private MetaDataStateFormat<ShardStateInfo> newShardStateInfoFormat(boolean deleteOldFiles) {
|
private MetaDataStateFormat<ShardStateInfo> newShardStateInfoFormat(boolean deleteOldFiles) {
|
||||||
|
|
|
@ -31,14 +31,14 @@ import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.common.Preconditions;
|
import org.elasticsearch.common.Preconditions;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.*;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.IOException;
|
||||||
import java.nio.file.*;
|
import java.io.OutputStream;
|
||||||
|
import java.nio.file.DirectoryStream;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.StandardCopyOption;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.index.store;
|
package org.elasticsearch.index.store;
|
||||||
|
|
||||||
import org.apache.lucene.store.StoreRateLimiting;
|
import org.apache.lucene.store.StoreRateLimiting;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -44,14 +46,22 @@ public interface IndexStore extends Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns <tt>true</tt> if this shard is allocated on this node. Allocated means
|
* Returns <tt>true</tt> if this shard is allocated on this node. Allocated means
|
||||||
* that it has storage files that can be deleted using {@link #deleteUnallocated(org.elasticsearch.index.shard.ShardId)}.
|
* that it has storage files that can be deleted using {@code deleteUnallocated(ShardId, Settings)}.
|
||||||
*/
|
*/
|
||||||
boolean canDeleteUnallocated(ShardId shardId);
|
boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes this shard store since its no longer allocated.
|
* Deletes this shard store since its no longer allocated.
|
||||||
*/
|
*/
|
||||||
void deleteUnallocated(ShardId shardId) throws IOException;
|
void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return an array of all index folder locations for a given shard
|
||||||
|
*/
|
||||||
Path[] shardIndexLocations(ShardId shardId);
|
Path[] shardIndexLocations(ShardId shardId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return an array of all translog folder locations for a given shard
|
||||||
|
*/
|
||||||
|
Path[] shardTranslogLocations(ShardId shardId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,9 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
|
||||||
public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type";
|
public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type";
|
||||||
public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec";
|
public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec";
|
||||||
|
|
||||||
|
public static final String INDEX_FOLDER_NAME = "index";
|
||||||
|
public static final String TRANSLOG_FOLDER_NAME = "translog";
|
||||||
|
|
||||||
class ApplySettings implements IndexSettingsService.Listener {
|
class ApplySettings implements IndexSettingsService.Listener {
|
||||||
@Override
|
@Override
|
||||||
public void onRefreshSettings(Settings settings) {
|
public void onRefreshSettings(Settings settings) {
|
||||||
|
@ -125,7 +128,7 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean canDeleteUnallocated(ShardId shardId) {
|
public boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||||
if (locations == null) {
|
if (locations == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -136,7 +139,7 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteUnallocated(ShardId shardId) throws IOException {
|
public void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
|
||||||
if (locations == null) {
|
if (locations == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -144,18 +147,39 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
|
||||||
throw new ElasticsearchIllegalStateException(shardId + " allocated, can't be deleted");
|
throw new ElasticsearchIllegalStateException(shardId + " allocated, can't be deleted");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
nodeEnv.deleteShardDirectorySafe(shardId);
|
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
logger.debug("failed to delete shard locations", ex);
|
logger.debug("failed to delete shard locations", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return an array of all index folder locations for a given shard. Uses
|
||||||
|
* the index settings to determine if a custom data path is set for the
|
||||||
|
* index and uses that if applicable.
|
||||||
|
*/
|
||||||
public Path[] shardIndexLocations(ShardId shardId) {
|
public Path[] shardIndexLocations(ShardId shardId) {
|
||||||
Path[] shardLocations = nodeEnv.shardPaths(shardId);
|
Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
|
||||||
Path[] shardIndexLocations = new Path[shardLocations.length];
|
Path[] locations = new Path[shardLocations.length];
|
||||||
for (int i = 0; i < shardLocations.length; i++) {
|
for (int i = 0; i < shardLocations.length; i++) {
|
||||||
shardIndexLocations[i] = shardLocations[i].resolve("index");
|
locations[i] = shardLocations[i].resolve(INDEX_FOLDER_NAME);
|
||||||
}
|
}
|
||||||
return shardIndexLocations;
|
logger.debug("using [{}] as shard's index location", locations);
|
||||||
|
return locations;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return an array of all translog folder locations for a given shard. Uses
|
||||||
|
* the index settings to determine if a custom data path is set for the
|
||||||
|
* index and uses that if applicable.
|
||||||
|
*/
|
||||||
|
public Path[] shardTranslogLocations(ShardId shardId) {
|
||||||
|
Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
|
||||||
|
Path[] locations = new Path[shardLocations.length];
|
||||||
|
for (int i = 0; i < shardLocations.length; i++) {
|
||||||
|
locations[i] = shardLocations[i].resolve(TRANSLOG_FOLDER_NAME);
|
||||||
|
}
|
||||||
|
logger.debug("using [{}] as shard's translog location", locations);
|
||||||
|
return locations;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,12 +31,15 @@ import org.elasticsearch.common.lease.Releasables;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.translog.*;
|
import org.elasticsearch.index.store.IndexStore;
|
||||||
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
import org.elasticsearch.index.translog.TranslogException;
|
||||||
|
import org.elasticsearch.index.translog.TranslogStats;
|
||||||
|
import org.elasticsearch.index.translog.TranslogStreams;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
|
@ -89,15 +92,14 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, NodeEnvironment nodeEnv, BigArrays bigArrays) throws IOException {
|
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
|
||||||
|
BigArrays bigArrays, IndexStore indexStore) throws IOException {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
this.indexSettingsService = indexSettingsService;
|
this.indexSettingsService = indexSettingsService;
|
||||||
this.bigArrays = bigArrays;
|
this.bigArrays = bigArrays;
|
||||||
Path[] shardLocations = nodeEnv.shardPaths(shardId);
|
this.locations = indexStore.shardTranslogLocations(shardId);
|
||||||
this.locations = new Path[shardLocations.length];
|
for (Path location : locations) {
|
||||||
for (int i = 0; i < shardLocations.length; i++) {
|
Files.createDirectories(location);
|
||||||
locations[i] = shardLocations[i].resolve("translog");
|
|
||||||
Files.createDirectories(locations[i]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name()));
|
this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name()));
|
||||||
|
|
|
@ -54,6 +54,7 @@ import org.elasticsearch.index.query.IndexQueryParserModule;
|
||||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||||
import org.elasticsearch.index.refresh.RefreshStats;
|
import org.elasticsearch.index.refresh.RefreshStats;
|
||||||
import org.elasticsearch.index.search.stats.SearchStats;
|
import org.elasticsearch.index.search.stats.SearchStats;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsModule;
|
import org.elasticsearch.index.settings.IndexSettingsModule;
|
||||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
|
@ -68,6 +69,7 @@ import org.elasticsearch.plugins.PluginsService;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Path;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
|
@ -20,16 +20,15 @@
|
||||||
package org.elasticsearch.indices.store;
|
package org.elasticsearch.indices.store;
|
||||||
|
|
||||||
import org.apache.lucene.store.StoreRateLimiting;
|
import org.apache.lucene.store.StoreRateLimiting;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.*;
|
import org.elasticsearch.cluster.*;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
@ -40,9 +39,9 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.IndexShardState;
|
import org.elasticsearch.index.shard.IndexShardState;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
@ -171,7 +170,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!indexService.hasShard(shardId.id())) {
|
if (!indexService.hasShard(shardId.id())) {
|
||||||
if (indexService.store().canDeleteUnallocated(shardId)) {
|
if (indexService.store().canDeleteUnallocated(shardId, indexService.settingsService().getSettings())) {
|
||||||
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
|
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -229,6 +228,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO will have to ammend this for shadow replicas so we don't delete the shared copy...
|
||||||
private void deleteShardIfExistElseWhere(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) {
|
private void deleteShardIfExistElseWhere(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) {
|
||||||
List<Tuple<DiscoveryNode, ShardActiveRequest>> requests = new ArrayList<>(indexShardRoutingTable.size());
|
List<Tuple<DiscoveryNode, ShardActiveRequest>> requests = new ArrayList<>(indexShardRoutingTable.size());
|
||||||
String indexUUID = state.getMetaData().index(indexShardRoutingTable.shardId().getIndex()).getUUID();
|
String indexUUID = state.getMetaData().index(indexShardRoutingTable.shardId().getIndex()).getUUID();
|
||||||
|
@ -320,6 +320,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||||
}
|
}
|
||||||
|
|
||||||
IndexService indexService = indicesService.indexService(shardId.getIndex());
|
IndexService indexService = indicesService.indexService(shardId.getIndex());
|
||||||
|
IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex());
|
||||||
if (indexService == null) {
|
if (indexService == null) {
|
||||||
// not physical allocation of the index, delete it from the file system if applicable
|
// not physical allocation of the index, delete it from the file system if applicable
|
||||||
if (nodeEnv.hasNodeFile()) {
|
if (nodeEnv.hasNodeFile()) {
|
||||||
|
@ -327,7 +328,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||||
if (FileSystemUtils.exists(shardLocations)) {
|
if (FileSystemUtils.exists(shardLocations)) {
|
||||||
logger.debug("{} deleting shard that is no longer used", shardId);
|
logger.debug("{} deleting shard that is no longer used", shardId);
|
||||||
try {
|
try {
|
||||||
nodeEnv.deleteShardDirectorySafe(shardId);
|
nodeEnv.deleteShardDirectorySafe(shardId, indexMeta.settings());
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
logger.debug("failed to delete shard locations", ex);
|
logger.debug("failed to delete shard locations", ex);
|
||||||
}
|
}
|
||||||
|
@ -335,10 +336,10 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!indexService.hasShard(shardId.id())) {
|
if (!indexService.hasShard(shardId.id())) {
|
||||||
if (indexService.store().canDeleteUnallocated(shardId)) {
|
if (indexService.store().canDeleteUnallocated(shardId, indexMeta.settings())) {
|
||||||
logger.debug("{} deleting shard that is no longer used", shardId);
|
logger.debug("{} deleting shard that is no longer used", shardId);
|
||||||
try {
|
try {
|
||||||
indexService.store().deleteUnallocated(shardId);
|
indexService.store().deleteUnallocated(shardId, indexMeta.settings());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.debug("{} failed to delete unallocated shard, ignoring", e, shardId);
|
logger.debug("{} failed to delete unallocated shard, ignoring", e, shardId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
|
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.env;
|
||||||
import org.apache.lucene.store.LockObtainFailedException;
|
import org.apache.lucene.store.LockObtainFailedException;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
|
@ -40,8 +41,13 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
|
||||||
public class NodeEnvironmentTests extends ElasticsearchTestCase {
|
public class NodeEnvironmentTests extends ElasticsearchTestCase {
|
||||||
|
|
||||||
|
private final Settings idxSettings = ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).build();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeLockSingleEnvironment() throws IOException {
|
public void testNodeLockSingleEnvironment() throws IOException {
|
||||||
NodeEnvironment env = newNodeEnvironment(ImmutableSettings.builder()
|
NodeEnvironment env = newNodeEnvironment(ImmutableSettings.builder()
|
||||||
|
@ -154,7 +160,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
env.deleteShardDirectorySafe(new ShardId("foo", 1));
|
env.deleteShardDirectorySafe(new ShardId("foo", 1), idxSettings);
|
||||||
fail("shard is locked");
|
fail("shard is locked");
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (LockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
|
@ -166,7 +172,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
env.deleteShardDirectorySafe(new ShardId("foo", 2));
|
env.deleteShardDirectorySafe(new ShardId("foo", 2), idxSettings);
|
||||||
|
|
||||||
for (Path path : env.indexPaths(new Index("foo"))) {
|
for (Path path : env.indexPaths(new Index("foo"))) {
|
||||||
assertTrue(Files.exists(path.resolve("1")));
|
assertTrue(Files.exists(path.resolve("1")));
|
||||||
|
@ -174,7 +180,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
env.deleteIndexDirectorySafe(new Index("foo"), randomIntBetween(0, 10));
|
env.deleteIndexDirectorySafe(new Index("foo"), randomIntBetween(0, 10), idxSettings);
|
||||||
fail("shard is locked");
|
fail("shard is locked");
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (LockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
|
@ -204,7 +210,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
|
||||||
t.start();
|
t.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
env.deleteIndexDirectorySafe(new Index("foo"), 5000);
|
env.deleteIndexDirectorySafe(new Index("foo"), 5000, idxSettings);
|
||||||
|
|
||||||
assertNull(threadException.get());
|
assertNull(threadException.get());
|
||||||
|
|
||||||
|
@ -306,4 +312,84 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
|
||||||
}
|
}
|
||||||
env.close();
|
env.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomDataPaths() throws Exception {
|
||||||
|
String[] dataPaths = tmpPaths();
|
||||||
|
NodeEnvironment env = newNodeEnvironment(dataPaths, ImmutableSettings.EMPTY);
|
||||||
|
|
||||||
|
Settings s1 = ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build();
|
||||||
|
Settings s2 = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH, "/tmp/foo").build();
|
||||||
|
ShardId sid = new ShardId("myindex", 0);
|
||||||
|
Index i = new Index("myindex");
|
||||||
|
|
||||||
|
assertFalse("no settings should mean no custom data path", NodeEnvironment.hasCustomDataPath(s1));
|
||||||
|
assertTrue("settings with path_data should have a custom data path", NodeEnvironment.hasCustomDataPath(s2));
|
||||||
|
|
||||||
|
assertThat(env.shardDataPaths(sid, s1), equalTo(env.shardPaths(sid)));
|
||||||
|
assertThat(env.shardDataPaths(sid, s2), equalTo(new Path[] {Paths.get("/tmp/foo/0/myindex/0")}));
|
||||||
|
|
||||||
|
assertThat("shard paths with a custom data_path should contain only regular paths",
|
||||||
|
env.shardPaths(sid),
|
||||||
|
equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex/0")));
|
||||||
|
|
||||||
|
assertThat("index paths uses the regular template",
|
||||||
|
env.indexPaths(i), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex")));
|
||||||
|
|
||||||
|
env.close();
|
||||||
|
NodeEnvironment env2 = newNodeEnvironment(dataPaths,
|
||||||
|
ImmutableSettings.builder().put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH, false).build());
|
||||||
|
|
||||||
|
assertThat(env2.shardDataPaths(sid, s1), equalTo(env2.shardPaths(sid)));
|
||||||
|
assertThat(env2.shardDataPaths(sid, s2), equalTo(new Path[] {Paths.get("/tmp/foo/myindex/0")}));
|
||||||
|
|
||||||
|
assertThat("shard paths with a custom data_path should contain only regular paths",
|
||||||
|
env2.shardPaths(sid),
|
||||||
|
equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex/0")));
|
||||||
|
|
||||||
|
assertThat("index paths uses the regular template",
|
||||||
|
env2.indexPaths(i), equalTo(stringsToPaths(dataPaths, "elasticsearch/nodes/0/indices/myindex")));
|
||||||
|
|
||||||
|
env2.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Converts an array of Strings to an array of Paths, adding an additional child if specified */
|
||||||
|
private Path[] stringsToPaths(String[] strings, String additional) {
|
||||||
|
Path[] locations = new Path[strings.length];
|
||||||
|
for (int i = 0; i < strings.length; i++) {
|
||||||
|
locations[i] = Paths.get(strings[i], additional);
|
||||||
|
}
|
||||||
|
return locations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String[] tmpPaths() {
|
||||||
|
final int numPaths = randomIntBetween(1, 3);
|
||||||
|
final String[] absPaths = new String[numPaths];
|
||||||
|
for (int i = 0; i < numPaths; i++) {
|
||||||
|
absPaths[i] = newTempDirPath().toAbsolutePath().toString();
|
||||||
|
}
|
||||||
|
return absPaths;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodeEnvironment newNodeEnvironment() throws IOException {
|
||||||
|
return newNodeEnvironment(ImmutableSettings.EMPTY);
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException {
|
||||||
|
Settings build = ImmutableSettings.builder()
|
||||||
|
.put(settings)
|
||||||
|
.put("path.home", newTempDirPath().toAbsolutePath().toString())
|
||||||
|
.put(NodeEnvironment.SETTING_CUSTOM_DATA_PATH_ENABLED, true)
|
||||||
|
.putArray("path.data", tmpPaths()).build();
|
||||||
|
return new NodeEnvironment(build, new Environment(build));
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodeEnvironment newNodeEnvironment(String[] dataPaths, Settings settings) throws IOException {
|
||||||
|
Settings build = ImmutableSettings.builder()
|
||||||
|
.put(settings)
|
||||||
|
.put("path.home", newTempDirPath().toAbsolutePath().toString())
|
||||||
|
.put(NodeEnvironment.SETTING_CUSTOM_DATA_PATH_ENABLED, true)
|
||||||
|
.putArray("path.data", dataPaths).build();
|
||||||
|
return new NodeEnvironment(build, new Environment(build));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,15 +20,15 @@ package org.elasticsearch.gateway;
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.*;
|
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||||
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
|
|
||||||
public class GatewayShardStateTests extends ElasticsearchTestCase {
|
public class GatewayShardStateTests extends ElasticsearchTestCase {
|
||||||
|
|
|
@ -0,0 +1,159 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.indices;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.nio.file.*;
|
||||||
|
|
||||||
|
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||||
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for custom data path locations and templates
|
||||||
|
*/
|
||||||
|
public class IndicesCustomDataPathTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
|
private String path;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
path = newTempDirPath().toAbsolutePath().toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
IOUtils.deleteFilesIgnoringExceptions(Paths.get(path));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDataPathCanBeChanged() throws Exception {
|
||||||
|
final String INDEX = "idx";
|
||||||
|
Path root = newTempDirPath();
|
||||||
|
Path startDir = root.resolve("start");
|
||||||
|
Path endDir = root.resolve("end");
|
||||||
|
logger.info("--> start dir: [{}]", startDir.toAbsolutePath().toString());
|
||||||
|
logger.info("--> end dir: [{}]", endDir.toAbsolutePath().toString());
|
||||||
|
// temp dirs are automatically created, but the end dir is what
|
||||||
|
// startDir is going to be renamed as, so it needs to be deleted
|
||||||
|
// otherwise we get all sorts of errors about the directory
|
||||||
|
// already existing
|
||||||
|
IOUtils.rm(endDir);
|
||||||
|
|
||||||
|
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH,
|
||||||
|
startDir.toAbsolutePath().toString());
|
||||||
|
ImmutableSettings.Builder sb2 = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH,
|
||||||
|
endDir.toAbsolutePath().toString());
|
||||||
|
|
||||||
|
logger.info("--> creating an index with data_path [{}]", startDir.toAbsolutePath().toString());
|
||||||
|
client().admin().indices().prepareCreate(INDEX).setSettings(sb).get();
|
||||||
|
ensureGreen(INDEX);
|
||||||
|
|
||||||
|
indexRandom(true, client().prepareIndex(INDEX, "doc", "1").setSource("{\"body\": \"foo\"}"));
|
||||||
|
|
||||||
|
SearchResponse resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get();
|
||||||
|
assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L));
|
||||||
|
|
||||||
|
logger.info("--> closing the index [{}]", INDEX);
|
||||||
|
client().admin().indices().prepareClose(INDEX).get();
|
||||||
|
logger.info("--> index closed, re-opening...");
|
||||||
|
client().admin().indices().prepareOpen(INDEX).get();
|
||||||
|
logger.info("--> index re-opened");
|
||||||
|
ensureGreen(INDEX);
|
||||||
|
|
||||||
|
resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get();
|
||||||
|
assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L));
|
||||||
|
|
||||||
|
// Now, try closing and changing the settings
|
||||||
|
|
||||||
|
logger.info("--> closing the index [{}]", INDEX);
|
||||||
|
client().admin().indices().prepareClose(INDEX).get();
|
||||||
|
|
||||||
|
logger.info("--> moving data on disk [{}] to [{}]", startDir.getFileName(), endDir.getFileName());
|
||||||
|
assert Files.exists(endDir) == false : "end directory should not exist!";
|
||||||
|
Files.move(startDir, endDir, StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
|
||||||
|
logger.info("--> updating settings...");
|
||||||
|
client().admin().indices().prepareUpdateSettings(INDEX)
|
||||||
|
.setSettings(sb2)
|
||||||
|
.setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true))
|
||||||
|
.get();
|
||||||
|
|
||||||
|
assert Files.exists(startDir) == false : "start dir shouldn't exist";
|
||||||
|
|
||||||
|
logger.info("--> settings updated and files moved, re-opening index");
|
||||||
|
client().admin().indices().prepareOpen(INDEX).get();
|
||||||
|
logger.info("--> index re-opened");
|
||||||
|
ensureGreen(INDEX);
|
||||||
|
|
||||||
|
resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get();
|
||||||
|
assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L));
|
||||||
|
|
||||||
|
assertAcked(client().admin().indices().prepareDelete(INDEX));
|
||||||
|
assertPathHasBeenCleared(startDir.toAbsolutePath().toString());
|
||||||
|
assertPathHasBeenCleared(endDir.toAbsolutePath().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndexCreatedWithCustomPathAndTemplate() throws Exception {
|
||||||
|
final String INDEX = "myindex2";
|
||||||
|
|
||||||
|
logger.info("--> creating an index with data_path [{}]", path);
|
||||||
|
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(IndexMetaData.SETTING_DATA_PATH, path);
|
||||||
|
|
||||||
|
client().admin().indices().prepareCreate(INDEX).setSettings(sb).get();
|
||||||
|
ensureGreen(INDEX);
|
||||||
|
|
||||||
|
indexRandom(true, client().prepareIndex(INDEX, "doc", "1").setSource("{\"body\": \"foo\"}"));
|
||||||
|
|
||||||
|
SearchResponse resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get();
|
||||||
|
assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L));
|
||||||
|
assertAcked(client().admin().indices().prepareDelete(INDEX));
|
||||||
|
assertPathHasBeenCleared(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertPathHasBeenCleared(String path) throws Exception {
|
||||||
|
int count = 0;
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("[");
|
||||||
|
if (Files.exists(Paths.get(path))) {
|
||||||
|
try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(path))) {
|
||||||
|
for (Path file : stream) {
|
||||||
|
if (Files.isRegularFile(file)) {
|
||||||
|
count++;
|
||||||
|
sb.append(file.toAbsolutePath().toString());
|
||||||
|
sb.append("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.append("]");
|
||||||
|
assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0));
|
||||||
|
}
|
||||||
|
}
|
|
@ -36,7 +36,6 @@ import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||||
import org.elasticsearch.test.InternalTestCluster;
|
import org.elasticsearch.test.InternalTestCluster;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.client.Requests;
|
import org.elasticsearch.client.Requests;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
|
@ -739,6 +740,12 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
||||||
if (numberOfReplicas >= 0) {
|
if (numberOfReplicas >= 0) {
|
||||||
builder.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build();
|
builder.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build();
|
||||||
}
|
}
|
||||||
|
// 30% of the time
|
||||||
|
if (randomInt(9) < 3) {
|
||||||
|
String dataPath = "data/custom-" + CHILD_JVM_ID + "/" + UUID.randomUUID().toString();
|
||||||
|
logger.info("using custom data_path for index: [{}]", dataPath);
|
||||||
|
builder.put(IndexMetaData.SETTING_DATA_PATH, dataPath);
|
||||||
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -276,6 +276,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||||
builder.put("script.disable_dynamic", false);
|
builder.put("script.disable_dynamic", false);
|
||||||
builder.put("http.pipelining", enableHttpPipelining);
|
builder.put("http.pipelining", enableHttpPipelining);
|
||||||
builder.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false);
|
builder.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false);
|
||||||
|
builder.put(NodeEnvironment.SETTING_CUSTOM_DATA_PATH_ENABLED, true);
|
||||||
if (Strings.hasLength(System.getProperty("es.logger.level"))) {
|
if (Strings.hasLength(System.getProperty("es.logger.level"))) {
|
||||||
builder.put("logger.level", System.getProperty("es.logger.level"));
|
builder.put("logger.level", System.getProperty("es.logger.level"));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue